telegram 群组监控 / 群组功能
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
13 KiB

# code=utf-8
import os, sys
from telethon.tl.functions.channels import JoinChannelRequest
from tg_module.initial_group import join_channel
from tg_utils.tg_model import GroupFunc
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram_crawler')[0] + "telegram_crawler"
sys.path.append(root_path)
from utils.MysqlData import MysqlPoolClient
from utils.push_kafka import SKafka
from telethon import TelegramClient, events
import json
from telethon.sessions import StringSession
from telethon.tl.types import User, ChatAdminRights, Updates, Channel
import time
from config import logger, SESSION_STRING, API_ID, API_HASH, PROXY, TOPIC_ADDRESS, TOPIC_DIC, CRAWLER_DB_CONF, \
TG_ROBOT_ACCOUNT_TABLE
import telethon, asyncio
from tg_utils.tg_api import *
from utils.upload_files import upload_file
from telethon.sync import TelegramClient as SyncTelegramClient
"""
监控目标群组聊天信息(包括资源下载、头像处理)
nohup python receive_message.py > rece.log 2>&1 &
"""
def get_group(api_id):
"""
加载需要监听的群组
TODO:可修改为从sql读取
:return:
"""
group_ids = []
logger.success(f"load group lengths {len(group_ids) if group_ids else 0}")
logger.success(f"load group: {group_ids}")
return group_ids
def push_kafka(kp, data, topic):
"""
kafka推送
:param kp:
:param data:
:param topic:
:return:
"""
try:
if isinstance(data, list):
kp.sync_producer(topic, data)
else:
kp.sync_producer(topic, [data])
except Exception as e:
logger.error(f"数据推送失败:{data}")
logger.error(f"kafka error: {e}")
async def update_filename(message, media, file="resources"):
"""
按照规则修改 下载并 上传文件
:param message:
:param media:
:param file:
:return:
"""
file_type = telethon.utils.get_extension(media)
logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 修改文件名称
time_stamp = str(int(time.time()))
file = f"{file}/{time_stamp}_{message.id}{file_type}"
filename = await download_media(message, file=file)
upload_path = await upload_file(filename) # 上传文件
return upload_path
else:
filename = await download_media(message)
upload_path = await upload_file(filename) # 上传文件
return upload_path
async def download_resources(message, media, client, sender, file="resources"):
"""
处理聊天记录中的资源文件下载并上传
:param message:
:param media:
:param client:
:param sender:
:param file:
:return:
"""
new_file_name = None # 对于没有媒体类的消息 直接不下载了
if media:
file_type = telethon.utils.get_extension(media)
# logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 能识别出来类型的 修改文件名称
time_stamp = str(int(time.time()))
new_file_name = f"{file}/{time_stamp}_{message.id}{file_type}"
else:
new_file_name = file
# 下载telegram 媒体类文件
media_path, photo_path = await asyncio.gather(
download_media(message, file=new_file_name), # 媒体文件下载
download_profile_photo(client, sender) # 下载头像
)
logger.info(f"下载媒体文件路径:{media_path} ; 下载头像文件路径:{photo_path} ;")
# 上传go-fast
upload_media_path, upload_photo_path = await asyncio.gather(
upload_file(media_path),
upload_file(photo_path)
)
logger.info(f"go-fast媒体文件路径:{upload_media_path} ; go-fast头像路径:{upload_photo_path} ;")
return upload_media_path, upload_photo_path
async def main(session_string, api_id, api_hash, kp=None, message_topic=""):
session = StringSession(session_string)
# session = session_string
logger.info(f"客户端 {api_id} 启动!!!!")
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
@client.on(events.NewMessage(chats=get_group(api_id), incoming=True))
async def handler(event):
"""
拦截 消息
可选只拦截目标群(chats控制)
:param event:
:return:
"""
sender = await event.get_sender() # 群组接受的为user 频道则接受的为频道
chat = await event.get_chat()
logger.info(f"测试 sender: {sender}")
logger.info(f"测试 chat: {chat}")
if not sender:
logger.info(f" 不存在发送者 群组异常: {chat}")
logger.info(f"meesages: {event}")
logger.info(f"messages: {event.message}")
return
if isinstance(sender, User):
sender_name = str(sender.first_name) + ' ' + str(sender.last_name)
else: # 如果是频道的话 发送者是以频道为对象发送
sender_name = sender.title
message = event.message
media = event.media
other_link = await get_extra_linked(message) # 获得超链接
# file_list, user_photo = await download_resources(message, media, client, sender)
file_list, user_photo = None, None
message_text = message.message
date = message.date
replay = message.reply_to # MessageReplyHeader 对象
data = {
"group_title": chat.title,
"group_id": -1000000000000 - chat.id,
"username": str(chat.username),
"sender_name": sender_name,
"sender_id": str(sender.id),
"media": file_list,
"sender_photo": user_photo,
"message_id": message.id,
"reply_to_msg_id": replay.reply_to_msg_id if replay else None, # 这个字段是针对某一条聊天记录的回复 *****
"reply_to_top_id": replay.reply_to_top_id if replay else None, # 这个字段是针对频道内的讨论组 针对的某一条聊天内容发起的讨论
"message_text": message_text,
"other_link": other_link, # 聊天过程中的超链接 以及对应的位置
"datetime": date.strftime("%Y-%m-%d %H:%M:%S")
}
logger.debug(f"client: {api_id} ; data: {data}")
# logger.debug(f"message: {message}")
# push_kafka(kp, json.dumps(data, ensure_ascii=False), message_topic)
if client:
logger.info("client start")
await client.run_until_disconnected()
async def _join_group(session_string, api_id, api_hash):
"""
加群 并且获取权限
:param session_string:
:param api_id:
:param api_hash:
:return:
"""
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
channel = "saudovskayaaraviya_rabota" # 加群测试demo
res = await client(JoinChannelRequest(channel=channel))
print(res)
if isinstance(res, Updates):
chats = res.chats
result = {"title": res.chats[0].title, "group_id": -1000000000000 - res.chats[0].id}
print(chats)
if isinstance(chats[0], Channel):
channel_obj = chats[0]
rights = channel_obj.default_banned_rights # 群组默认的权限对象
result["allow_sending"] = False
if rights: # 目前发现只有群组存在这个值
if (not rights.send_messages) and (not rights.send_photos) and (not rights.send_plain):
print("允许发送 图文信息")
result["allow_sending"] = False
print(f"加群返回结果:{result}")
async def _replay_demo(session_string, api_id, api_hash):
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
# TODO: 实现判断是否能在群里发消息以及发消息
channel = "teleSUR_tv" # 频道demo broadcast
# channel = "xeqm3333" # 群组demo megagroup send_plain=True/send_photos=False 可以发
# channel = "Pakistanichatgingroup" # 群组demo
channel = "tvhd02"
# channel = 'MarketingTools2024' # 群组demo 啥都不允许发
# channel = 'ali_the_esq'
# channel = "+86 184 4310 4914"
# channel = "appics_official"
# channel = "saudovskayaaraviya_rabota"
channel = "MarketingTools2024"
group_entity = await client.get_entity(channel) # 获取群组类型
print(group_entity)
res = {"allow_sending": {}}
print(res)
res["allow_sending"] = await GroupFunc.get_group_rights(client, group_entity)
print(res)
if "+" not in channel:
# print(f"群组信息 {group_entity}")
print(group_entity.default_banned_rights)
if group_entity.broadcast:
print(f" 当前群组为 频道 => {channel}")
else:
a = await client.get_permissions(group_entity, 'me') # 确定是不是管理员禁止发送消息
print(a.is_banned)
# print(group_entity.admin_rights)
print(group_entity.default_banned_rights)
print(f" send_plain => {group_entity.default_banned_rights.send_plain}")
print(f" send_photos => {group_entity.default_banned_rights.send_photos}")
# if group_entity.default_banned_rights:
# # TODO: 测试发送类型 图片 send_photos/ 消息 send_plain
# pass
# 发送文件
# file_path = "resources/b0a26d56a34a1840fe68e5314b3a70e5.jpeg"
# with open(file_path, "r", encoding="utf-8") as f:
# content = f.read()
# content = 'http://172.18.1.180:9980/group17/default/20240507/17/07/3/image.jpg'
# res = await client.send_message(group_entity, "<b> ni hao wa </b>", file=content)
# res = await client.send_file(group_entity, file_path)
# f.close()
# res = await client.send_message(group_entity, file=file_path)
# res = await client.send_message(group_entity, "hello")
# print(res)
await client.disconnect()
async def leave_group(session_string, api_id, api_hash):
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
dialogs = await client.get_dialogs() # 获取所有群组的 ID 和名称
print(f" 一共有 =》 {len(dialogs)}")
for i in range(len(dialogs)):
print(f"删除 =》 {dialogs[i]}")
await client.delete_dialog(dialogs[i])
time.sleep(5)
await client.disconnect()
async def login():
# api_id = "28340634"
# api_hash = "5c54ebfc2729b32bc6f49e4b34747f47"
data = {'id': 2, 'phone': '18846824798', 'api_id': '22955009',
'api_hash': 'c5581d1ed5880f6e0b1734dad1d4a10d',
'session_string': '1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA=',
'plateform': 'DeskTop', 'total_group': 274}
session = StringSession(data["session_string"])
client = TelegramClient(session, data["api_id"], data["api_hash"], timeout=60, proxy=PROXY)
print(client)
await client.start()
# client = SyncTelegramClient("session_name", data["api_id"], data["api_hash"], timeout=60)
# await client.start()
print(StringSession.save(client.session))
print("连接成功")
await client.disconnect()
async def mul_account():
tasks = []
api_id = "28279639"
api_hash = "dc7eb234661a51bfd723ecd201797e1a"
# session_string = "demo1"
session_string = "1BVtsOMUBu6HAZK23ZSobi4_zwJPrKdjckKtX3NzXA6mj26BaZTQ3fGlhKNke-A7unmk2mH4ESMi7mfQx2vvogezgj8C-W7VO2V_91vTIZv-crQ3yaQNMSwwEJaOYRmCr5SrLV1cILYP0bgP0o2Sr-KIP82Z0mDxfJycW0q65liB3W_nZMqiO2131QK8_Nq_MEYrG8uKj6rV8XQOcSTZYmu27_dHz_elbJMrJZvcFGCg3l7EtoZr7VlQDcUP_4wfoSHWGqOuipxnnmZbd97mZENrTsfPdJNdX_vI7QPRrK69p2mwmlXQ6Waj_6zKp3nHNQPSuK4Athiy6Fcle8TZ9BRQep8IryxA="
tasks.append(main(session_string, api_id, api_hash))
# tasks.append(login())
# tasks.append(_replay_demo(session_string, api_id, api_hash))
# tasks.append(_join_group(session_string, api_id, api_hash))
# tasks.append(leave_group(session_string, api_id, api_hash))
logger.info(f"获得任务数量 {len(tasks)}")
await asyncio.gather(*tasks)
if __name__ == '__main__':
# main()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(mul_account())
except Exception as e:
print(f"An error occurred: {e}")
# mul_account()
# pass