# code=utf-8 import os, sys root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram_crawler')[0] + "telegram_crawler" sys.path.append(root_path) from utils.MysqlData import MysqlPoolClient from utils.push_kafka import SKafka from telethon import TelegramClient, events import json from telethon.sessions import StringSession from telethon.tl.types import User import time from config import logger, SESSION_STRING, API_ID, API_HASH, PROXY, TOPIC_ADDRESS, TOPIC_DIC, CRAWLER_DB_CONF, \ TG_ROBOT_ACCOUNT_TABLE import telethon, asyncio from tg_utils.tg_api import * from utils.upload_files import upload_file """ 监控目标群组聊天信息(包括资源下载、头像处理) nohup python receive_message.py > rece.log 2>&1 & """ def get_group(api_id): """ 加载需要监听的群组 修改为黑名单 TODO:可修改为从sql读取 :return: """ with open("group_ids.json", 'r') as f: chat_names = json.load(f) group_ids = [group["group_id"] for group in chat_names if group["group_id"] and not group["is_group"]] # 目前是 退群了仍旧会被监控到? 采用黑名单 下列群组不再接受监控 :( group_ids = [ -1001440842385, -1001121481396, -1001643896726, -1001156230593, -1001964495180, -1001510876777, -1001878335287, -1001360128984, -1001461057234, -1001144411934, -1001790944354, -1001759695404, -1001643896726, -1002001492389 ] logger.success(f"load group lengths {len(group_ids) if group_ids else 0}") return group_ids def push_kafka(kp, data, topic): """ kafka推送 :param kp: :param data: :param topic: :return: """ try: if isinstance(data, list): kp.sync_producer(topic, data) else: kp.sync_producer(topic, [data]) except Exception as e: logger.error(f"数据推送失败:{data}") logger.error(f"kafka error: {e}") async def update_filename(message, media, file="resources"): """ 按照规则修改 下载并 上传文件 :param message: :param media: :param file: :return: """ file_type = telethon.utils.get_extension(media) logger.info(f"捕获文件类型:{file_type}") if "." in file_type: # 修改文件名称 time_stamp = str(int(time.time())) file = f"{file}/{time_stamp}_{message.id}{file_type}" filename = await download_media(message, file=file) upload_path = await upload_file(filename) # 上传文件 return upload_path else: filename = await download_media(message) upload_path = await upload_file(filename) # 上传文件 return upload_path async def download_resources(message, media, client, sender, file="resources"): """ 处理聊天记录中的资源文件下载并上传 :param message: :param media: :param client: :param sender: :param file: :return: """ new_file_name = None # 对于没有媒体类的消息 直接不下载了 if media: file_type = telethon.utils.get_extension(media) # logger.info(f"捕获文件类型:{file_type}") if "." in file_type: # 能识别出来类型的 修改文件名称 time_stamp = str(int(time.time())) new_file_name = f"{file}/{time_stamp}_{message.id}{file_type}" else: new_file_name = file # 下载telegram 媒体类文件 media_path, photo_path = await asyncio.gather( download_media(message, file=new_file_name), # 媒体文件下载 download_profile_photo(client, sender) # 下载头像 ) logger.info(f"下载媒体文件路径:{media_path} ; 下载头像文件路径:{photo_path} ;") # 上传go-fast upload_media_path, upload_photo_path = await asyncio.gather( upload_file(media_path), upload_file(photo_path) ) logger.info(f"go-fast媒体文件路径:{upload_media_path} ; go-fast头像路径:{upload_photo_path} ;") return upload_media_path, upload_photo_path async def main(session_string, api_id, api_hash, kp=None, message_topic=""): session = StringSession(session_string) logger.info(f"客户端 {api_id} 启动!!!!") client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY) await client.start() @client.on(events.NewMessage(chats=get_group(api_id), incoming=True, blacklist_chats=True)) async def handler(event): """ 拦截 消息 可选只拦截目标群(chats控制) :param event: :return: """ sender = await event.get_sender() # 群组接受的为user 频道则接受的为频道 chat = await event.get_chat() # if not sender or (hasattr(sender, "bot") and sender.bot): # 群组机器人过滤 # logger.info(f"过滤机器人测试: {sender}") # return if not sender: # 没有sender可能也有消息 logger.info(f" 不存在发送者 群组异常: {chat}") # logger.info(f"meesages: {event}") # logger.info(f"messages: {event.message}") sender = chat # 一般此时为channel if isinstance(sender, User): sender_name = str(sender.first_name) + ' ' + str(sender.last_name) else: # 如果是频道的话 发送者是以频道为对象发送 sender_name = sender.title message = event.message media = event.media other_link = await get_extra_linked(message) # 获得超链接 file_list, user_photo = await download_resources(message, media, client, sender) message_text = message.message date = message.date replay = message.reply_to # MessageReplyHeader 对象 data = { "group_title": chat.title, "group_id": -1000000000000 - chat.id, "username": str(chat.username), "sender_name": sender_name, "sender_id": str(sender.id), "media": file_list, "sender_photo": user_photo, "message_id": message.id, "reply_to_msg_id": replay.reply_to_msg_id if replay else None, # 这个字段是针对某一条聊天记录的回复 ***** "reply_to_top_id": replay.reply_to_top_id if replay else None, # 这个字段是针对频道内的讨论组 针对的某一条聊天内容发起的讨论 "message_text": message_text, "other_link": other_link, # 聊天过程中的超链接 以及对应的位置 "datetime": date.strftime("%Y-%m-%d %H:%M:%S") } logger.debug(f"client: {api_id} ; data: {data}") # logger.debug(f"message: {message}") push_kafka(kp, json.dumps(data, ensure_ascii=False), message_topic) if client: logger.info("client start") await client.run_until_disconnected() async def mul_account(): client_mysql = MysqlPoolClient(CRAWLER_DB_CONF) # 排除 18443104914(有效)、18846824798(无效) 账号 sql = f"select * from {TG_ROBOT_ACCOUNT_TABLE} where api_id='22955009' order by update_time" results = client_mysql.getAll(sql) kp = SKafka(bootstrap_servers=TOPIC_ADDRESS) message_topic = TOPIC_DIC["testtelegram"] logger.info(f"链接 kafka {TOPIC_ADDRESS}:{message_topic} 成功") tasks = [] for i in results: api_id = i["api_id"] api_hash = i["api_hash"] session_string = i["session_string"] logger.info(i) tasks.append(main(session_string, api_id, api_hash, kp, message_topic)) logger.info(f"获得任务数量 {len(tasks)}") await asyncio.gather(*tasks) if __name__ == '__main__': # main() loop = asyncio.get_event_loop() try: loop.run_until_complete(mul_account()) except Exception as e: print(f"An error occurred: {e}") # mul_account() # pass