telegram 群组监控 / 群组功能
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
7.7 KiB

# code=utf-8
import os, sys
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram-crawler')[0] + "telegram-crawler"
sys.path.append(root_path)
from utils.MysqlData import MysqlPoolClient
from utils.push_kafka import SKafka
from telethon import TelegramClient, events
import json
from telethon.sessions import StringSession
from telethon.tl.types import User
import time
from config import logger, SESSION_STRING, API_ID, API_HASH, PROXY, TOPIC_ADDRESS, TOPIC_DIC, CRAWLER_DB_CONF, \
TG_ROBOT_ACCOUNT_TABLE
import telethon, asyncio
from tg_utils.tg_api import *
from utils.upload_files import upload_file
"""
监控目标群组聊天信息(包括资源下载、头像处理)
nohup python receive_message.py > rece.log 2>&1 &
"""
def get_group(api_id):
"""
加载需要监听的群组
修改为黑名单
TODO:可修改为从sql读取
:return:
"""
with open("group_ids.json", 'r') as f:
chat_names = json.load(f)
group_ids = [group["group_id"] for group in chat_names if group["group_id"] and not group["is_group"]]
# 目前是 退群了仍旧会被监控到? 采用黑名单 下列群组不再接受监控 :(
group_ids = [
-1001440842385, -1001121481396, -1001643896726,
-1001156230593, -1001964495180, -1001510876777,
-1001878335287, -1001360128984, -1001461057234,
-1001144411934, -1001790944354, -1001759695404,
-1001643896726, -1002001492389
]
logger.success(f"load group lengths {len(group_ids) if group_ids else 0}")
return group_ids
def push_kafka(kp, data, topic):
"""
kafka推送
:param kp:
:param data:
:param topic:
:return:
"""
try:
if isinstance(data, list):
kp.sync_producer(topic, data)
else:
kp.sync_producer(topic, [data])
except Exception as e:
logger.error(f"数据推送失败:{data}")
logger.error(f"kafka error: {e}")
async def update_filename(message, media, file="resources"):
"""
按照规则修改 下载并 上传文件
:param message:
:param media:
:param file:
:return:
"""
file_type = telethon.utils.get_extension(media)
logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 修改文件名称
time_stamp = str(int(time.time()))
file = f"{file}/{time_stamp}_{message.id}{file_type}"
filename = await download_media(message, file=file)
upload_path = await upload_file(filename) # 上传文件
return upload_path
else:
filename = await download_media(message)
upload_path = await upload_file(filename) # 上传文件
return upload_path
async def download_resources(message, media, client, sender, file="resources"):
"""
处理聊天记录中的资源文件下载并上传
:param message:
:param media:
:param client:
:param sender:
:param file:
:return:
"""
new_file_name = None # 对于没有媒体类的消息 直接不下载了
if media:
file_type = telethon.utils.get_extension(media)
# logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 能识别出来类型的 修改文件名称
time_stamp = str(int(time.time()))
new_file_name = f"{file}/{time_stamp}_{message.id}{file_type}"
else:
new_file_name = file
# 下载telegram 媒体类文件
media_path, photo_path = await asyncio.gather(
download_media(message, file=new_file_name), # 媒体文件下载
download_profile_photo(client, sender) # 下载头像
)
logger.info(f"下载媒体文件路径:{media_path} ; 下载头像文件路径:{photo_path} ;")
# 上传go-fast
upload_media_path, upload_photo_path = await asyncio.gather(
upload_file(media_path),
upload_file(photo_path)
)
logger.info(f"go-fast媒体文件路径:{upload_media_path} ; go-fast头像路径:{upload_photo_path} ;")
return upload_media_path, upload_photo_path
async def main(session_string, api_id, api_hash, kp=None, message_topic=""):
session = StringSession(session_string)
logger.info(f"客户端 {api_id} 启动!!!!")
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
@client.on(events.NewMessage(chats=get_group(api_id), incoming=True, blacklist_chats=True))
async def handler(event):
"""
拦截 消息
可选只拦截目标群(chats控制)
:param event:
:return:
"""
sender = await event.get_sender() # 群组接受的为user 频道则接受的为频道
chat = await event.get_chat()
# if not sender or (hasattr(sender, "bot") and sender.bot): # 群组机器人过滤
# logger.info(f"过滤机器人测试: {sender}")
# return
if not sender: # 没有sender可能也有消息
logger.info(f" 不存在发送者 群组异常: {chat}")
# logger.info(f"meesages: {event}")
# logger.info(f"messages: {event.message}")
sender = chat # 一般此时为channel
if isinstance(sender, User):
sender_name = str(sender.first_name) + ' ' + str(sender.last_name)
else: # 如果是频道的话 发送者是以频道为对象发送
sender_name = sender.title
message = event.message
media = event.media
other_link = await get_extra_linked(message) # 获得超链接
file_list, user_photo = await download_resources(message, media, client, sender)
message_text = message.message
date = message.date
replay = message.reply_to # MessageReplyHeader 对象
data = {
"group_title": chat.title,
"group_id": -1000000000000 - chat.id,
"username": str(chat.username),
"sender_name": sender_name,
"sender_id": str(sender.id),
"media": file_list,
"sender_photo": user_photo,
"message_id": message.id,
"reply_to_msg_id": replay.reply_to_msg_id if replay else None, # 这个字段是针对某一条聊天记录的回复 *****
"reply_to_top_id": replay.reply_to_top_id if replay else None, # 这个字段是针对频道内的讨论组 针对的某一条聊天内容发起的讨论
"message_text": message_text,
"other_link": other_link, # 聊天过程中的超链接 以及对应的位置
"datetime": date.strftime("%Y-%m-%d %H:%M:%S")
}
logger.debug(f"client: {api_id} ; data: {data}")
# logger.debug(f"message: {message}")
push_kafka(kp, json.dumps(data, ensure_ascii=False), message_topic)
if client:
logger.info("client start")
await client.run_until_disconnected()
async def mul_account():
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
sql = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by update_time"
results = client_mysql.getAll(sql)
kp = SKafka(bootstrap_servers=TOPIC_ADDRESS)
message_topic = TOPIC_DIC["testtelegram"]
logger.info(f"链接 kafka {TOPIC_ADDRESS}:{message_topic} 成功")
tasks = []
for i in results:
api_id = i["api_id"]
api_hash = i["api_hash"]
session_string = i["session_string"]
logger.info(i)
tasks.append(main(session_string, api_id, api_hash, kp, message_topic))
logger.info(f"获得任务数量 {len(tasks)}")
await asyncio.gather(*tasks)
if __name__ == '__main__':
# main()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(mul_account())
except Exception as e:
print(f"An error occurred: {e}")
# mul_account()
# pass