You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
219 lines
7.7 KiB
219 lines
7.7 KiB
# code=utf-8
|
|
import os, sys
|
|
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram_crawler')[0] + "telegram_crawler"
|
|
sys.path.append(root_path)
|
|
|
|
from utils.MysqlData import MysqlPoolClient
|
|
from utils.push_kafka import SKafka
|
|
from telethon import TelegramClient, events
|
|
import json
|
|
from telethon.sessions import StringSession
|
|
from telethon.tl.types import User
|
|
import time
|
|
from config import logger, SESSION_STRING, API_ID, API_HASH, PROXY, TOPIC_ADDRESS, TOPIC_DIC, CRAWLER_DB_CONF, \
|
|
TG_ROBOT_ACCOUNT_TABLE
|
|
import telethon, asyncio
|
|
|
|
from tg_utils.tg_api import *
|
|
from utils.upload_files import upload_file
|
|
|
|
"""
|
|
监控目标群组聊天信息(包括资源下载、头像处理)
|
|
nohup python receive_message.py > rece.log 2>&1 &
|
|
"""
|
|
|
|
|
|
def get_group(api_id):
|
|
"""
|
|
加载需要监听的群组
|
|
修改为黑名单
|
|
TODO:可修改为从sql读取
|
|
:return:
|
|
"""
|
|
with open("group_ids.json", 'r') as f:
|
|
chat_names = json.load(f)
|
|
|
|
group_ids = [group["group_id"] for group in chat_names if group["group_id"] and not group["is_group"]]
|
|
|
|
# 目前是 退群了仍旧会被监控到? 采用黑名单 下列群组不再接受监控 :(
|
|
group_ids = [
|
|
-1001440842385, -1001121481396, -1001643896726,
|
|
-1001156230593, -1001964495180, -1001510876777,
|
|
-1001878335287, -1001360128984, -1001461057234,
|
|
-1001144411934, -1001790944354, -1001759695404,
|
|
-1001643896726, -1002001492389
|
|
]
|
|
logger.success(f"load group lengths {len(group_ids) if group_ids else 0}")
|
|
return group_ids
|
|
|
|
|
|
def push_kafka(kp, data, topic):
|
|
"""
|
|
kafka推送
|
|
:param kp:
|
|
:param data:
|
|
:param topic:
|
|
:return:
|
|
"""
|
|
try:
|
|
if isinstance(data, list):
|
|
kp.sync_producer(topic, data)
|
|
else:
|
|
kp.sync_producer(topic, [data])
|
|
except Exception as e:
|
|
logger.error(f"数据推送失败:{data}")
|
|
logger.error(f"kafka error: {e}")
|
|
|
|
|
|
async def update_filename(message, media, file="resources"):
|
|
"""
|
|
按照规则修改 下载并 上传文件
|
|
:param message:
|
|
:param media:
|
|
:param file:
|
|
:return:
|
|
"""
|
|
file_type = telethon.utils.get_extension(media)
|
|
logger.info(f"捕获文件类型:{file_type}")
|
|
if "." in file_type: # 修改文件名称
|
|
time_stamp = str(int(time.time()))
|
|
file = f"{file}/{time_stamp}_{message.id}{file_type}"
|
|
filename = await download_media(message, file=file)
|
|
upload_path = await upload_file(filename) # 上传文件
|
|
return upload_path
|
|
else:
|
|
filename = await download_media(message)
|
|
upload_path = await upload_file(filename) # 上传文件
|
|
return upload_path
|
|
|
|
|
|
async def download_resources(message, media, client, sender, file="resources"):
|
|
"""
|
|
处理聊天记录中的资源文件下载并上传
|
|
|
|
:param message:
|
|
:param media:
|
|
:param client:
|
|
:param sender:
|
|
:param file:
|
|
:return:
|
|
"""
|
|
new_file_name = None # 对于没有媒体类的消息 直接不下载了
|
|
if media:
|
|
file_type = telethon.utils.get_extension(media)
|
|
# logger.info(f"捕获文件类型:{file_type}")
|
|
if "." in file_type: # 能识别出来类型的 修改文件名称
|
|
time_stamp = str(int(time.time()))
|
|
new_file_name = f"{file}/{time_stamp}_{message.id}{file_type}"
|
|
else:
|
|
new_file_name = file
|
|
# 下载telegram 媒体类文件
|
|
media_path, photo_path = await asyncio.gather(
|
|
download_media(message, file=new_file_name), # 媒体文件下载
|
|
download_profile_photo(client, sender) # 下载头像
|
|
)
|
|
|
|
logger.info(f"下载媒体文件路径:{media_path} ; 下载头像文件路径:{photo_path} ;")
|
|
# 上传go-fast
|
|
upload_media_path, upload_photo_path = await asyncio.gather(
|
|
upload_file(media_path),
|
|
upload_file(photo_path)
|
|
)
|
|
logger.info(f"go-fast媒体文件路径:{upload_media_path} ; go-fast头像路径:{upload_photo_path} ;")
|
|
|
|
return upload_media_path, upload_photo_path
|
|
|
|
|
|
async def main(session_string, api_id, api_hash, kp=None, message_topic=""):
|
|
session = StringSession(session_string)
|
|
logger.info(f"客户端 {api_id} 启动!!!!")
|
|
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
|
|
await client.start()
|
|
|
|
@client.on(events.NewMessage(chats=get_group(api_id), incoming=True, blacklist_chats=True))
|
|
async def handler(event):
|
|
"""
|
|
拦截 消息
|
|
可选只拦截目标群(chats控制)
|
|
:param event:
|
|
:return:
|
|
"""
|
|
sender = await event.get_sender() # 群组接受的为user 频道则接受的为频道
|
|
chat = await event.get_chat()
|
|
|
|
# if not sender or (hasattr(sender, "bot") and sender.bot): # 群组机器人过滤
|
|
# logger.info(f"过滤机器人测试: {sender}")
|
|
# return
|
|
if not sender: # 没有sender可能也有消息
|
|
logger.info(f" 不存在发送者 群组异常: {chat}")
|
|
# logger.info(f"meesages: {event}")
|
|
# logger.info(f"messages: {event.message}")
|
|
sender = chat # 一般此时为channel
|
|
|
|
if isinstance(sender, User):
|
|
sender_name = str(sender.first_name) + ' ' + str(sender.last_name)
|
|
else: # 如果是频道的话 发送者是以频道为对象发送
|
|
sender_name = sender.title
|
|
|
|
message = event.message
|
|
media = event.media
|
|
other_link = await get_extra_linked(message) # 获得超链接
|
|
file_list, user_photo = await download_resources(message, media, client, sender)
|
|
message_text = message.message
|
|
date = message.date
|
|
replay = message.reply_to # MessageReplyHeader 对象
|
|
data = {
|
|
"group_title": chat.title,
|
|
"group_id": -1000000000000 - chat.id,
|
|
"username": str(chat.username),
|
|
"sender_name": sender_name,
|
|
"sender_id": str(sender.id),
|
|
"media": file_list,
|
|
"sender_photo": user_photo,
|
|
"message_id": message.id,
|
|
"reply_to_msg_id": replay.reply_to_msg_id if replay else None, # 这个字段是针对某一条聊天记录的回复 *****
|
|
"reply_to_top_id": replay.reply_to_top_id if replay else None, # 这个字段是针对频道内的讨论组 针对的某一条聊天内容发起的讨论
|
|
"message_text": message_text,
|
|
"other_link": other_link, # 聊天过程中的超链接 以及对应的位置
|
|
"datetime": date.strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
|
|
logger.debug(f"client: {api_id} ; data: {data}")
|
|
# logger.debug(f"message: {message}")
|
|
push_kafka(kp, json.dumps(data, ensure_ascii=False), message_topic)
|
|
|
|
if client:
|
|
logger.info("client start")
|
|
|
|
await client.run_until_disconnected()
|
|
|
|
|
|
async def mul_account():
|
|
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
|
|
sql = f"select * from {TG_ROBOT_ACCOUNT_TABLE} where api_id!='28340634' order by update_time"
|
|
results = client_mysql.getAll(sql)
|
|
kp = SKafka(bootstrap_servers=TOPIC_ADDRESS)
|
|
message_topic = TOPIC_DIC["testtelegram"]
|
|
logger.info(f"链接 kafka {TOPIC_ADDRESS}:{message_topic} 成功")
|
|
|
|
tasks = []
|
|
for i in results:
|
|
api_id = i["api_id"]
|
|
api_hash = i["api_hash"]
|
|
session_string = i["session_string"]
|
|
logger.info(i)
|
|
tasks.append(main(session_string, api_id, api_hash, kp, message_topic))
|
|
|
|
logger.info(f"获得任务数量 {len(tasks)}")
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# main()
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
loop.run_until_complete(mul_account())
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
# mul_account()
|
|
# pass
|