You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
343 lines
13 KiB
343 lines
13 KiB
# code=utf-8
|
|
import os, sys
|
|
|
|
from telethon.tl.functions.channels import JoinChannelRequest
|
|
|
|
from tg_module.initial_group import join_channel
|
|
from tg_utils.tg_model import GroupFunc
|
|
|
|
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram_crawler')[0] + "telegram_crawler"
|
|
sys.path.append(root_path)
|
|
|
|
from utils.MysqlData import MysqlPoolClient
|
|
from utils.push_kafka import SKafka
|
|
from telethon import TelegramClient, events
|
|
import json
|
|
from telethon.sessions import StringSession
|
|
from telethon.tl.types import User, ChatAdminRights, Updates, Channel
|
|
import time
|
|
from config import logger, SESSION_STRING, API_ID, API_HASH, PROXY, TOPIC_ADDRESS, TOPIC_DIC, CRAWLER_DB_CONF, \
|
|
TG_ROBOT_ACCOUNT_TABLE
|
|
import telethon, asyncio
|
|
|
|
from tg_utils.tg_api import *
|
|
from utils.upload_files import upload_file
|
|
|
|
from telethon.sync import TelegramClient as SyncTelegramClient
|
|
|
|
"""
|
|
监控目标群组聊天信息(包括资源下载、头像处理)
|
|
nohup python receive_message.py > rece.log 2>&1 &
|
|
"""
|
|
|
|
|
|
def get_group(api_id):
|
|
"""
|
|
加载需要监听的群组
|
|
TODO:可修改为从sql读取
|
|
:return:
|
|
"""
|
|
group_ids = []
|
|
logger.success(f"load group lengths {len(group_ids) if group_ids else 0}")
|
|
logger.success(f"load group: {group_ids}")
|
|
return group_ids
|
|
|
|
|
|
def push_kafka(kp, data, topic):
|
|
"""
|
|
kafka推送
|
|
:param kp:
|
|
:param data:
|
|
:param topic:
|
|
:return:
|
|
"""
|
|
try:
|
|
if isinstance(data, list):
|
|
kp.sync_producer(topic, data)
|
|
else:
|
|
kp.sync_producer(topic, [data])
|
|
except Exception as e:
|
|
logger.error(f"数据推送失败:{data}")
|
|
logger.error(f"kafka error: {e}")
|
|
|
|
|
|
async def update_filename(message, media, file="resources"):
|
|
"""
|
|
按照规则修改 下载并 上传文件
|
|
:param message:
|
|
:param media:
|
|
:param file:
|
|
:return:
|
|
"""
|
|
file_type = telethon.utils.get_extension(media)
|
|
logger.info(f"捕获文件类型:{file_type}")
|
|
if "." in file_type: # 修改文件名称
|
|
time_stamp = str(int(time.time()))
|
|
file = f"{file}/{time_stamp}_{message.id}{file_type}"
|
|
filename = await download_media(message, file=file)
|
|
upload_path = await upload_file(filename) # 上传文件
|
|
return upload_path
|
|
else:
|
|
filename = await download_media(message)
|
|
upload_path = await upload_file(filename) # 上传文件
|
|
return upload_path
|
|
|
|
|
|
async def download_resources(message, media, client, sender, file="resources"):
|
|
"""
|
|
处理聊天记录中的资源文件下载并上传
|
|
|
|
:param message:
|
|
:param media:
|
|
:param client:
|
|
:param sender:
|
|
:param file:
|
|
:return:
|
|
"""
|
|
new_file_name = None # 对于没有媒体类的消息 直接不下载了
|
|
if media:
|
|
file_type = telethon.utils.get_extension(media)
|
|
# logger.info(f"捕获文件类型:{file_type}")
|
|
if "." in file_type: # 能识别出来类型的 修改文件名称
|
|
time_stamp = str(int(time.time()))
|
|
new_file_name = f"{file}/{time_stamp}_{message.id}{file_type}"
|
|
else:
|
|
new_file_name = file
|
|
# 下载telegram 媒体类文件
|
|
media_path, photo_path = await asyncio.gather(
|
|
download_media(message, file=new_file_name), # 媒体文件下载
|
|
download_profile_photo(client, sender) # 下载头像
|
|
)
|
|
|
|
logger.info(f"下载媒体文件路径:{media_path} ; 下载头像文件路径:{photo_path} ;")
|
|
# 上传go-fast
|
|
upload_media_path, upload_photo_path = await asyncio.gather(
|
|
upload_file(media_path),
|
|
upload_file(photo_path)
|
|
)
|
|
logger.info(f"go-fast媒体文件路径:{upload_media_path} ; go-fast头像路径:{upload_photo_path} ;")
|
|
|
|
return upload_media_path, upload_photo_path
|
|
|
|
|
|
async def main(session_string, api_id, api_hash, kp=None, message_topic=""):
|
|
session = StringSession(session_string)
|
|
# session = session_string
|
|
|
|
logger.info(f"客户端 {api_id} 启动!!!!")
|
|
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
|
|
await client.start()
|
|
|
|
@client.on(events.NewMessage(chats=get_group(api_id), incoming=True))
|
|
async def handler(event):
|
|
"""
|
|
拦截 消息
|
|
可选只拦截目标群(chats控制)
|
|
:param event:
|
|
:return:
|
|
"""
|
|
sender = await event.get_sender() # 群组接受的为user 频道则接受的为频道
|
|
chat = await event.get_chat()
|
|
|
|
logger.info(f"测试 sender: {sender}")
|
|
logger.info(f"测试 chat: {chat}")
|
|
if not sender:
|
|
logger.info(f" 不存在发送者 群组异常: {chat}")
|
|
logger.info(f"meesages: {event}")
|
|
logger.info(f"messages: {event.message}")
|
|
return
|
|
|
|
|
|
if isinstance(sender, User):
|
|
sender_name = str(sender.first_name) + ' ' + str(sender.last_name)
|
|
else: # 如果是频道的话 发送者是以频道为对象发送
|
|
sender_name = sender.title
|
|
|
|
message = event.message
|
|
media = event.media
|
|
other_link = await get_extra_linked(message) # 获得超链接
|
|
# file_list, user_photo = await download_resources(message, media, client, sender)
|
|
file_list, user_photo = None, None
|
|
message_text = message.message
|
|
date = message.date
|
|
replay = message.reply_to # MessageReplyHeader 对象
|
|
data = {
|
|
"group_title": chat.title,
|
|
"group_id": -1000000000000 - chat.id,
|
|
"username": str(chat.username),
|
|
"sender_name": sender_name,
|
|
"sender_id": str(sender.id),
|
|
"media": file_list,
|
|
"sender_photo": user_photo,
|
|
"message_id": message.id,
|
|
"reply_to_msg_id": replay.reply_to_msg_id if replay else None, # 这个字段是针对某一条聊天记录的回复 *****
|
|
"reply_to_top_id": replay.reply_to_top_id if replay else None, # 这个字段是针对频道内的讨论组 针对的某一条聊天内容发起的讨论
|
|
"message_text": message_text,
|
|
"other_link": other_link, # 聊天过程中的超链接 以及对应的位置
|
|
"datetime": date.strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
|
|
logger.debug(f"client: {api_id} ; data: {data}")
|
|
# logger.debug(f"message: {message}")
|
|
# push_kafka(kp, json.dumps(data, ensure_ascii=False), message_topic)
|
|
|
|
if client:
|
|
logger.info("client start")
|
|
|
|
await client.run_until_disconnected()
|
|
|
|
|
|
async def _join_group(session_string, api_id, api_hash):
|
|
"""
|
|
加群 并且获取权限
|
|
:param session_string:
|
|
:param api_id:
|
|
:param api_hash:
|
|
:return:
|
|
"""
|
|
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
|
|
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
|
|
await client.start()
|
|
|
|
channel = "saudovskayaaraviya_rabota" # 加群测试demo
|
|
|
|
res = await client(JoinChannelRequest(channel=channel))
|
|
print(res)
|
|
if isinstance(res, Updates):
|
|
chats = res.chats
|
|
result = {"title": res.chats[0].title, "group_id": -1000000000000 - res.chats[0].id}
|
|
print(chats)
|
|
if isinstance(chats[0], Channel):
|
|
channel_obj = chats[0]
|
|
rights = channel_obj.default_banned_rights # 群组默认的权限对象
|
|
result["allow_sending"] = False
|
|
if rights: # 目前发现只有群组存在这个值
|
|
if (not rights.send_messages) and (not rights.send_photos) and (not rights.send_plain):
|
|
print("允许发送 图文信息")
|
|
result["allow_sending"] = False
|
|
|
|
print(f"加群返回结果:{result}")
|
|
|
|
|
|
async def _replay_demo(session_string, api_id, api_hash):
|
|
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
|
|
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
|
|
await client.start()
|
|
|
|
# TODO: 实现判断是否能在群里发消息以及发消息
|
|
|
|
channel = "teleSUR_tv" # 频道demo broadcast
|
|
# channel = "xeqm3333" # 群组demo megagroup send_plain=True/send_photos=False 可以发
|
|
# channel = "Pakistanichatgingroup" # 群组demo
|
|
channel = "tvhd02"
|
|
# channel = 'MarketingTools2024' # 群组demo 啥都不允许发
|
|
# channel = 'ali_the_esq'
|
|
# channel = "+86 184 4310 4914"
|
|
# channel = "appics_official"
|
|
# channel = "saudovskayaaraviya_rabota"
|
|
channel = "MarketingTools2024"
|
|
group_entity = await client.get_entity(channel) # 获取群组类型
|
|
|
|
print(group_entity)
|
|
res = {"allow_sending": {}}
|
|
print(res)
|
|
res["allow_sending"] = await GroupFunc.get_group_rights(client, group_entity)
|
|
print(res)
|
|
if "+" not in channel:
|
|
# print(f"群组信息 {group_entity}")
|
|
|
|
print(group_entity.default_banned_rights)
|
|
|
|
if group_entity.broadcast:
|
|
print(f" 当前群组为 频道 => {channel}")
|
|
else:
|
|
a = await client.get_permissions(group_entity, 'me') # 确定是不是管理员禁止发送消息
|
|
print(a.is_banned)
|
|
# print(group_entity.admin_rights)
|
|
print(group_entity.default_banned_rights)
|
|
print(f" send_plain => {group_entity.default_banned_rights.send_plain}")
|
|
print(f" send_photos => {group_entity.default_banned_rights.send_photos}")
|
|
# if group_entity.default_banned_rights:
|
|
# # TODO: 测试发送类型 图片 send_photos/ 消息 send_plain
|
|
# pass
|
|
|
|
# 发送文件
|
|
# file_path = "resources/b0a26d56a34a1840fe68e5314b3a70e5.jpeg"
|
|
# with open(file_path, "r", encoding="utf-8") as f:
|
|
# content = f.read()
|
|
|
|
# content = 'http://172.18.1.180:9980/group17/default/20240507/17/07/3/image.jpg'
|
|
# res = await client.send_message(group_entity, "<b> ni hao wa </b>", file=content)
|
|
# res = await client.send_file(group_entity, file_path)
|
|
# f.close()
|
|
|
|
# res = await client.send_message(group_entity, file=file_path)
|
|
# res = await client.send_message(group_entity, "hello")
|
|
|
|
# print(res)
|
|
|
|
await client.disconnect()
|
|
|
|
|
|
async def leave_group(session_string, api_id, api_hash):
|
|
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
|
|
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
|
|
await client.start()
|
|
dialogs = await client.get_dialogs() # 获取所有群组的 ID 和名称
|
|
print(f" 一共有 =》 {len(dialogs)}")
|
|
for i in range(len(dialogs)):
|
|
print(f"删除 =》 {dialogs[i]}")
|
|
await client.delete_dialog(dialogs[i])
|
|
time.sleep(5)
|
|
|
|
await client.disconnect()
|
|
|
|
async def login():
|
|
# api_id = "28340634"
|
|
# api_hash = "5c54ebfc2729b32bc6f49e4b34747f47"
|
|
data = {'id': 2, 'phone': '18846824798', 'api_id': '22955009',
|
|
'api_hash': 'c5581d1ed5880f6e0b1734dad1d4a10d',
|
|
'session_string': '1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA=',
|
|
'plateform': 'DeskTop', 'total_group': 274}
|
|
|
|
session = StringSession(data["session_string"])
|
|
client = TelegramClient(session, data["api_id"], data["api_hash"], timeout=60, proxy=PROXY)
|
|
print(client)
|
|
await client.start()
|
|
|
|
# client = SyncTelegramClient("session_name", data["api_id"], data["api_hash"], timeout=60)
|
|
# await client.start()
|
|
print(StringSession.save(client.session))
|
|
print("连接成功")
|
|
|
|
await client.disconnect()
|
|
|
|
|
|
async def mul_account():
|
|
|
|
tasks = []
|
|
|
|
api_id = "28279639"
|
|
api_hash = "dc7eb234661a51bfd723ecd201797e1a"
|
|
# session_string = "demo1"
|
|
session_string = "1BVtsOMUBu6HAZK23ZSobi4_zwJPrKdjckKtX3NzXA6mj26BaZTQ3fGlhKNke-A7unmk2mH4ESMi7mfQx2vvogezgj8C-W7VO2V_91vTIZv-crQ3yaQNMSwwEJaOYRmCr5SrLV1cILYP0bgP0o2Sr-KIP82Z0mDxfJycW0q65liB3W_nZMqiO2131QK8_Nq_MEYrG8uKj6rV8XQOcSTZYmu27_dHz_elbJMrJZvcFGCg3l7EtoZr7VlQDcUP_4wfoSHWGqOuipxnnmZbd97mZENrTsfPdJNdX_vI7QPRrK69p2mwmlXQ6Waj_6zKp3nHNQPSuK4Athiy6Fcle8TZ9BRQep8IryxA="
|
|
|
|
tasks.append(main(session_string, api_id, api_hash))
|
|
# tasks.append(login())
|
|
# tasks.append(_replay_demo(session_string, api_id, api_hash))
|
|
# tasks.append(_join_group(session_string, api_id, api_hash))
|
|
# tasks.append(leave_group(session_string, api_id, api_hash))
|
|
|
|
logger.info(f"获得任务数量 {len(tasks)}")
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# main()
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
loop.run_until_complete(mul_account())
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
# mul_account()
|
|
# pass
|