Browse Source

initial

master
steve.gao 1 year ago
commit
ea90fa9ff4
  1. 3
      .idea/.gitignore
  2. 6
      .idea/inspectionProfiles/profiles_settings.xml
  3. 7
      .idea/misc.xml
  4. 8
      .idea/modules.xml
  5. 11
      .idea/telegram_crawler.iml
  6. 6
      .idea/vcs.xml
  7. 12
      README.md
  8. 56
      clean.py
  9. 74
      config.py
  10. 33
      requirements.txt
  11. 127
      server.py
  12. 342
      test.py
  13. 1
      tg_module/__init__.py
  14. 1329
      tg_module/group_ids.json
  15. 161
      tg_module/group_link.txt
  16. 370
      tg_module/initial_group.py
  17. 219
      tg_module/receive_message.py
  18. 0
      tg_utils/__init__.py
  19. 119
      tg_utils/tg_api.py
  20. 143
      tg_utils/tg_model.py
  21. 111
      utils/Logger.py
  22. 197
      utils/MysqlData.py
  23. 0
      utils/__init__.py
  24. 50
      utils/push_kafka.py
  25. 22
      utils/tools.py
  26. 56
      utils/upload_files.py
  27. 12
      uwsgi.ini

3
.idea/.gitignore

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

6
.idea/inspectionProfiles/profiles_settings.xml

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (telegram-crawler)" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>
</project>

8
.idea/modules.xml

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/telegram_crawler.iml" filepath="$PROJECT_DIR$/.idea/telegram_crawler.iml" />
</modules>
</component>
</project>

11
.idea/telegram_crawler.iml

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.8 (telegram-crawler)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="pytest" />
</component>
</module>

6
.idea/vcs.xml

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

12
README.md

@ -0,0 +1,12 @@
# telegram-crawler-downloader
telegram采集
- telegram 加群服务
- server.py:服务启动入口
- telegram 群组监控
- tg_module/receive_message.py:负责对目标账号进行监控、上传文件、推送kafka
- tg_module/initial_group.py:负责对目标账号的所有群组进行更新
- 资源清理:
- clean.py:定时清理下载资源文件

56
clean.py

@ -0,0 +1,56 @@
import os
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram-crawler')[0] + "telegram-crawler"
import time
from loguru import logger
import shutil
"""
0 0 * * * cd /opt/crawl/telegram/telegram-crawler && /root/anaconda3/envs/python3.8/bin/python clean.py > /dev/null 2>&1
cd /opt/crawl/telegram/telegram-crawler && /root/anaconda3/envs/python3.8/bin/python clean.py > /dev/null 2>&1
"""
def release_resources(days=3):
"""
:param days:
:return:
"""
resources_dir = 'tg_module/resources'
profile_dir = 'tg_module/profile'
files = []
# 获取目录下所有文件
resources_files = [os.path.join(resources_dir, f) for f in os.listdir(resources_dir)]
profile_files = [os.path.join(profile_dir, f) for f in os.listdir(profile_dir)]
files.extend(resources_files)
files.extend(profile_files)
now = time.time()
delay = days * 24 * 60 * 60
delay = days * 60 * 60
logger.info(f"获取数量共:{len(files)};resources: {len(resources_files)}, profile: {len(profile_files)}")
limited_time = now - delay
# 按照创建时间排序
files = sorted(files, key=os.path.getctime)
count = 0
for i in files:
create_time = os.path.getctime(i)
if create_time < limited_time: # 需要删除
if os.path.isfile(i) or os.path.islink(i): # 如果是文件则直接删除
os.remove(i)
count += 1
else: # 删除文件夹及其所有内容
shutil.rmtree(i)
logger.debug(f"删除了一个文件夹: {i}")
else:
break
logger.info(f"删除文件数量:{count}")
if __name__ == '__main__':
release_resources()

74
config.py

@ -0,0 +1,74 @@
from utils.Logger import MyLogger
"""
"""
# **************************************************************
# telegram 账号配置 TODO: 多个账号需要从sql读取
SESSION_NAME = "session_name"
API_ID = 28340634
API_HASH = '5c54ebfc2729b32bc6f49e4b34747f47'
SESSION_STRING = "1BVtsOG8Buw04Jkp3pNiWll0L2AzXwTGUCNpDnxPvUagdG7iLL_oaHoiepFG3LqF0gnZUe60-IXMn-MCGjQ_tTPjZP0VDeSl_0EmnKjZvaRxvX8pPKbmbLqZmvJjPmplEbel1CosRzCC-naBrnoE6vGetrzAvawzY8OA8_R5VNmGXCaCEPKumC9Safh7qLUntW70e2V4GYqRzwoYkZPdZzQKcfYufKrt3cfnTrCESE4_EDTRotGcqRm6IaAx9XLCrFsDwIPNJYf5ij6-Uzq8cx6qAD1IpSLy7hDehH1gfkM4N4Wem4LduJVlmHnV5Q4IHmiXrE2YAO4DTUpKV0_UMJRDDLkjB8XM="
ENABLE_PROXY = True
if ENABLE_PROXY:
PROXY = ("socks5", '127.0.0.1', 1080)
else:
PROXY = None
# **************************************************************
# kafka
TOPIC_DIC = {
# 主贴
"post": "video_post_crawl_data",
# 用户
"user": "video_user_crawl_data",
# 回复
"reply": "video_reply_crawl_data",
# 粉丝/关注列表
"fans": "video_fans_crawl_data",
# 点赞/分享
"like": "video_like_crawl_data",
# 需要下载的数据Topic
"download": "video_need_download",
# 优先级任务
"priority": "file_crawl_undownload_priority",
# 普通任务
"common": "file_crawl_undownload_common",
# 测试任务
# "testtelegram": "test_telegram",
"testtelegram": "test_telegram_1"
}
TOPIC_ADDRESS = "" # kafka地址
# **************************************************************
# 上传链接
UPLOAD_URL = "http://x/upload"
# **************************************************************
# 日志
logger = MyLogger()
# **************************************************************
# mysql
# class CRAWLER_DB_CONF(object):
# DBHOST = 'si-1'
# DBPORT = 3306
# DBUSER = 'crawl'
# DBPWD = 'crawl123'
# DBNAME = ''
# DBCHAR = 'utf8mb4'
# DB_FULL_NAME = ""
class CRAWLER_DB_CONF(object):
DBHOST = '127.0.0.1'
DBPORT = 3306
DBUSER = 'root'
DBPWD = '1q2w3e4r'
DBNAME = 'tg_message'
DBCHAR = 'utf8mb4'
DB_FULL_NAME = "tg_message"
TG_ROBOT_ACCOUNT_TABLE = 'tg_robot_account'
TG_ROBOT_ACCOUNT_GROUP_TABLE = 'tg_robot_group'

33
requirements.txt

@ -0,0 +1,33 @@
aiohttp==3.9.1
aiosignal==1.3.1
asgiref==3.7.2
async-timeout==4.0.3
attrs==23.2.0
blinker==1.7.0
bottle==0.12.25
click==8.1.7
cryptg==0.4.0
DBUtils==3.0.3
Flask==3.0.0
frozenlist==1.4.1
idna==3.6
importlib-metadata==7.0.1
itsdangerous==2.1.2
Jinja2==3.1.2
kafka-python==2.0.2
loguru==0.7.2
MarkupSafe==2.1.3
multidict==6.0.4
Paste==3.7.1
pyaes==1.6.1
pyasn1==0.5.1
PyMySQL==1.1.0
python-socks==2.4.4
rsa==4.9
six==1.16.0
Telethon==1.33.1
typing_extensions==4.9.0
uWSGI==2.0.23
Werkzeug==3.0.1
yarl==1.9.4
zipp==3.17.0

127
server.py

@ -0,0 +1,127 @@
import json
import traceback
from flask import Flask, request
from loguru import logger
from tg_module.initial_group import run, _replay, _detail
app = Flask(__name__)
"""
pip install 'flask[async]': flask
uwsgi -d --ini uwsgi.ini
uwsgi --stop
uwsgi --reload
"""
@app.route("/join")
async def index():
"""
telegram_link: telegram链接 public群组
:return:
{"code": 200, "content": obj}
{"code": 500, "content": error}/ {"code": 200, "content": ""}
"""
state_203 = "You have successfully requested to join this chat or channel"
try:
telegram_link = request.args.get('telegram_link')
if telegram_link and "https://t.me/" in telegram_link:
i = telegram_link.strip()
split_list = i.split("/")
is_private = False
if len(split_list) > 4 and split_list[-2] == "joinchat": # 切换为私密群添加方式
is_private = True
channel = split_list[-1]
logger.info(f"获得当前群链接:{i}, 获得当前channel:{channel}")
res = await run(telegram_link, channel, is_private=is_private)
if isinstance(res, dict):
ret = {"code": 200, "content": res}
elif state_203 in res:
ret = {"code": 203, "content": state_203}
else:
ret = {"code": 500, "content": res}
else:
ret = {"code": 500, "content": "telegram_link error"}
except Exception as e:
traceback.print_exc()
logger.error(f"server error: {e}")
ret = {"code": 500, "content": str(e)}
logger.info(ret)
return json.dumps(ret, ensure_ascii=False)
@app.route("/replay", methods=["POST"])
async def replay():
"""
:return:
"""
try:
data = request.get_data()
json_data = json.loads(data)
telegram_link = json_data.get("telegram_link")
messages = json_data.get("messages")
# messages = {
# "message": "",
# "media_type": "", # photos/videos/plain/docs
# "media_link": ""
# }
# telegram_link = "https://t.me/xeqm3333"
if telegram_link and "https://t.me/" in telegram_link:
i = telegram_link.strip()
split_list = i.split("/")
if len(split_list) > 4 and split_list[-2] == "joinchat": # TODO: 私密群暂时不支持
ret = {"code": 500, "content": "私密群暂时不支持"}
else:
channel = split_list[-1]
logger.info(f"获得当前群链接:{i}, 获得当前channel:{channel}")
status = await _replay(telegram_link, channel, messages)
if status["send_message_status"] == "发送成功":
ret = {"code": 200, "content": status}
else:
ret = {"code": 400, "content": status}
else:
ret = {"code": 500, "content": "telegram_link error"}
except Exception as e:
traceback.print_exc()
logger.error(f"server error: {e}")
ret = {"code": 500, "content": str(e)}
logger.info(ret)
return json.dumps(ret, ensure_ascii=False)
@app.route("/detail")
async def get_detail():
"""
:return:
"""
try:
telegram_link = request.args.get('telegram_link')
if telegram_link and "https://t.me/" in telegram_link:
i = telegram_link.strip()
split_list = i.split("/")
if len(split_list) > 4 and split_list[-2] == "joinchat": # TODO: 私密群暂时不支持
ret = {"code": 500, "content": "私密群暂时不支持"}
else:
channel = split_list[-1]
logger.info(f"获得当前群链接:{i}, 获得当前channel:{channel}")
group_detail = await _detail(channel, file="tg_module/profile")
ret = {"code": 200, "content": group_detail}
else:
ret = {"code": 500, "content": "telegram_link error"}
except Exception as e:
traceback.print_exc()
logger.error(f"server error: {e}")
ret = {"code": 500, "content": str(e)}
logger.info(ret)
return json.dumps(ret, ensure_ascii=False)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

342
test.py

@ -0,0 +1,342 @@
# code=utf-8
import os, sys
from telethon.tl.functions.channels import JoinChannelRequest
from tg_module.initial_group import join_channel
from tg_utils.tg_model import GroupFunc
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram-crawler')[0] + "telegram-crawler"
sys.path.append(root_path)
from utils.MysqlData import MysqlPoolClient
from utils.push_kafka import SKafka
from telethon import TelegramClient, events
import json
from telethon.sessions import StringSession
from telethon.tl.types import User, ChatAdminRights, Updates, Channel
import time
from config import logger, SESSION_STRING, API_ID, API_HASH, PROXY, TOPIC_ADDRESS, TOPIC_DIC, CRAWLER_DB_CONF, \
TG_ROBOT_ACCOUNT_TABLE
import telethon, asyncio
from tg_utils.tg_api import *
from utils.upload_files import upload_file
from telethon.sync import TelegramClient as SyncTelegramClient
"""
nohup python receive_message.py > rece.log 2>&1 &
"""
def get_group(api_id):
"""
TODO:sql读取
:return:
"""
group_ids = []
logger.success(f"load group lengths {len(group_ids) if group_ids else 0}")
logger.success(f"load group: {group_ids}")
return group_ids
def push_kafka(kp, data, topic):
"""
kafka推送
:param kp:
:param data:
:param topic:
:return:
"""
try:
if isinstance(data, list):
kp.sync_producer(topic, data)
else:
kp.sync_producer(topic, [data])
except Exception as e:
logger.error(f"数据推送失败:{data}")
logger.error(f"kafka error: {e}")
async def update_filename(message, media, file="resources"):
"""
:param message:
:param media:
:param file:
:return:
"""
file_type = telethon.utils.get_extension(media)
logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 修改文件名称
time_stamp = str(int(time.time()))
file = f"{file}/{time_stamp}_{message.id}{file_type}"
filename = await download_media(message, file=file)
upload_path = await upload_file(filename) # 上传文件
return upload_path
else:
filename = await download_media(message)
upload_path = await upload_file(filename) # 上传文件
return upload_path
async def download_resources(message, media, client, sender, file="resources"):
"""
:param message:
:param media:
:param client:
:param sender:
:param file:
:return:
"""
new_file_name = None # 对于没有媒体类的消息 直接不下载了
if media:
file_type = telethon.utils.get_extension(media)
# logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 能识别出来类型的 修改文件名称
time_stamp = str(int(time.time()))
new_file_name = f"{file}/{time_stamp}_{message.id}{file_type}"
else:
new_file_name = file
# 下载telegram 媒体类文件
media_path, photo_path = await asyncio.gather(
download_media(message, file=new_file_name), # 媒体文件下载
download_profile_photo(client, sender) # 下载头像
)
logger.info(f"下载媒体文件路径:{media_path} ; 下载头像文件路径:{photo_path} ;")
# 上传go-fast
upload_media_path, upload_photo_path = await asyncio.gather(
upload_file(media_path),
upload_file(photo_path)
)
logger.info(f"go-fast媒体文件路径:{upload_media_path} ; go-fast头像路径:{upload_photo_path} ;")
return upload_media_path, upload_photo_path
async def main(session_string, api_id, api_hash, kp=None, message_topic=""):
session = StringSession(session_string)
# session = session_string
logger.info(f"客户端 {api_id} 启动!!!!")
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
@client.on(events.NewMessage(chats=get_group(api_id), incoming=True))
async def handler(event):
"""
chats控制
:param event:
:return:
"""
sender = await event.get_sender() # 群组接受的为user 频道则接受的为频道
chat = await event.get_chat()
logger.info(f"测试 sender: {sender}")
logger.info(f"测试 chat: {chat}")
if not sender:
logger.info(f" 不存在发送者 群组异常: {chat}")
logger.info(f"meesages: {event}")
logger.info(f"messages: {event.message}")
return
if isinstance(sender, User):
sender_name = str(sender.first_name) + ' ' + str(sender.last_name)
else: # 如果是频道的话 发送者是以频道为对象发送
sender_name = sender.title
message = event.message
media = event.media
other_link = await get_extra_linked(message) # 获得超链接
file_list, user_photo = await download_resources(message, media, client, sender)
message_text = message.message
date = message.date
replay = message.reply_to # MessageReplyHeader 对象
data = {
"group_title": chat.title,
"group_id": -1000000000000 - chat.id,
"username": str(chat.username),
"sender_name": sender_name,
"sender_id": str(sender.id),
"media": file_list,
"sender_photo": user_photo,
"message_id": message.id,
"reply_to_msg_id": replay.reply_to_msg_id if replay else None, # 这个字段是针对某一条聊天记录的回复 *****
"reply_to_top_id": replay.reply_to_top_id if replay else None, # 这个字段是针对频道内的讨论组 针对的某一条聊天内容发起的讨论
"message_text": message_text,
"other_link": other_link, # 聊天过程中的超链接 以及对应的位置
"datetime": date.strftime("%Y-%m-%d %H:%M:%S")
}
logger.debug(f"client: {api_id} ; data: {data}")
# logger.debug(f"message: {message}")
# push_kafka(kp, json.dumps(data, ensure_ascii=False), message_topic)
if client:
logger.info("client start")
await client.run_until_disconnected()
async def _join_group(session_string, api_id, api_hash):
"""
:param session_string:
:param api_id:
:param api_hash:
:return:
"""
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
channel = "saudovskayaaraviya_rabota" # 加群测试demo
res = await client(JoinChannelRequest(channel=channel))
print(res)
if isinstance(res, Updates):
chats = res.chats
result = {"title": res.chats[0].title, "group_id": -1000000000000 - res.chats[0].id}
print(chats)
if isinstance(chats[0], Channel):
channel_obj = chats[0]
rights = channel_obj.default_banned_rights # 群组默认的权限对象
result["allow_sending"] = False
if rights: # 目前发现只有群组存在这个值
if (not rights.send_messages) and (not rights.send_photos) and (not rights.send_plain):
print("允许发送 图文信息")
result["allow_sending"] = False
print(f"加群返回结果:{result}")
async def _replay_demo(session_string, api_id, api_hash):
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
# TODO: 实现判断是否能在群里发消息以及发消息
channel = "teleSUR_tv" # 频道demo broadcast
# channel = "xeqm3333" # 群组demo megagroup send_plain=True/send_photos=False 可以发
# channel = "Pakistanichatgingroup" # 群组demo
channel = "tvhd02"
# channel = 'MarketingTools2024' # 群组demo 啥都不允许发
# channel = 'ali_the_esq'
# channel = "+86 184 4310 4914"
# channel = "appics_official"
# channel = "saudovskayaaraviya_rabota"
channel = "MarketingTools2024"
group_entity = await client.get_entity(channel) # 获取群组类型
print(group_entity)
res = {"allow_sending": {}}
print(res)
res["allow_sending"] = await GroupFunc.get_group_rights(client, group_entity)
print(res)
if "+" not in channel:
# print(f"群组信息 {group_entity}")
print(group_entity.default_banned_rights)
if group_entity.broadcast:
print(f" 当前群组为 频道 => {channel}")
else:
a = await client.get_permissions(group_entity, 'me') # 确定是不是管理员禁止发送消息
print(a.is_banned)
# print(group_entity.admin_rights)
print(group_entity.default_banned_rights)
print(f" send_plain => {group_entity.default_banned_rights.send_plain}")
print(f" send_photos => {group_entity.default_banned_rights.send_photos}")
# if group_entity.default_banned_rights:
# # TODO: 测试发送类型 图片 send_photos/ 消息 send_plain
# pass
# 发送文件
# file_path = "resources/b0a26d56a34a1840fe68e5314b3a70e5.jpeg"
# with open(file_path, "r", encoding="utf-8") as f:
# content = f.read()
# content = 'http://172.18.1.180:9980/group17/default/20240507/17/07/3/image.jpg'
# res = await client.send_message(group_entity, "<b> ni hao wa </b>", file=content)
# res = await client.send_file(group_entity, file_path)
# f.close()
# res = await client.send_message(group_entity, file=file_path)
# res = await client.send_message(group_entity, "hello")
# print(res)
await client.disconnect()
async def leave_group(session_string, api_id, api_hash):
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
dialogs = await client.get_dialogs() # 获取所有群组的 ID 和名称
print(f" 一共有 =》 {len(dialogs)}")
for i in range(len(dialogs)):
print(f"删除 =》 {dialogs[i]}")
await client.delete_dialog(dialogs[i])
time.sleep(5)
await client.disconnect()
async def login():
api_id = "28340634"
api_hash = "5c54ebfc2729b32bc6f49e4b34747f47"
data = {'id': 2, 'phone': '18846824798', 'api_id': '28340634',
'api_hash': '5c54ebfc2729b32bc6f49e4b34747f47',
'session_string': '1BVtsOMUBu22sAF4QtYuqDLre0vn4JxLy8utbzMvJ7kd6q6ZKNW_lIwUZEu38uPiRsz7uWiGK9gQURYPdMCLSjnWejTZ1pbdPT4kWSYfl4gN2iGb5G_Ib4mQpW4XgUErpiyIiCaTfG9ph59bz4Y-sw2rDhHSPVoL6PttS_OnQI5PKbVKSQLmO8A6OGoauNyf1_MlmPv1MdQUkA6Ep7fgXcKWKUUZ3YWkmIojWDzz02505D0Jcn-mQ3ED6zESToZafZsyj9Ktcf2NUzJA44DXMy2RcOfNjyyFMafl1kNrLAJ2uoz5vwzM0adw38AYhxVOH6VYbBm8ZnKf7T-9EQkwsXMKZL1FFJ8A=',
'plateform': 'DeskTop', 'total_group': 274}
session = StringSession(data["session_string"])
client = TelegramClient(session, data["api_id"], data["api_hash"], timeout=60, proxy=PROXY)
print(client)
await client.start()
# client = SyncTelegramClient("session_name", data["api_id"], data["api_hash"], timeout=60)
# await client.start()
print(StringSession.save(client.session))
print("连接成功")
await client.disconnect()
async def mul_account():
tasks = []
api_id = "22955009"
api_hash = "c5581d1ed5880f6e0b1734dad1d4a10d"
# session_string = "demo1"
session_string = "1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA="
tasks.append(main(session_string, api_id, api_hash))
# tasks.append(login())
# tasks.append(_replay_demo(session_string, api_id, api_hash))
# tasks.append(_join_group(session_string, api_id, api_hash))
# tasks.append(leave_group(session_string, api_id, api_hash))
logger.info(f"获得任务数量 {len(tasks)}")
await asyncio.gather(*tasks)
if __name__ == '__main__':
# main()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(mul_account())
except Exception as e:
print(f"An error occurred: {e}")
# mul_account()
# pass

1
tg_module/__init__.py

@ -0,0 +1 @@

1329
tg_module/group_ids.json
File diff suppressed because it is too large
View File

161
tg_module/group_link.txt

@ -0,0 +1,161 @@
https://t.me/alaviaat
https://t.me/AlaviShahed
https://t.me/marmoozat
https://t.me/Bangladesh88019
https://t.me/babyruksan_hdooloporn_hdadultn
https://t.me/newhindimm
https://t.me/akhbarkhorasanjonubi
https://t.me/irna_birjand
https://t.me/vj_pd
https://t.me/javazenokari
https://t.me/javazenokari_ir
https://t.me/taskin_org
https://t.me/taskin_dell
https://t.me/ibsgrouppp
https://t.me/asharqbusiness
https://t.me/Currency_gold
https://t.me/siyasigroups
https://t.me/Raadeillhad
https://t.me/AnarakNews
https://t.me/anarakico
https://t.me/dailyofethnicjokes
https://t.me/upozit
https://t.me/iranarchive
https://t.me/ModiriatMali_Valiasr
https://t.me/VazinArgham
https://t.me/turk_fiilm
https://t.me/zrumbesh
https://t.me/Shahilibrarymoradabad2
https://t.me/siyasteimamali
https://t.me/eslamfekhe
https://t.me/MsjdAboAhmed
https://t.me/khaneh_omran
https://t.me/SeparatismoDoBrasil
https://t.me/concomunal
https://t.me/neuropsicopedagogiaautismo
https://t.me/dibujosdedospersonasaburridas
https://t.me/personasykt
https://t.me/comercio
https://t.me/FotosMDM
https://t.me/AsambleaCuba
https://t.me/asambleaiberia
https://t.me/rorarealm
https://t.me/monarquiaconstitucional
https://t.me/krest_isusika
https://t.me/sakha_resistance
https://t.me/Separatizm
https://t.me/inflationwatch
https://t.me/inflyacia
https://t.me/VVPCRIMEA
https://t.me/mr_tskhovrebov
https://t.me/anarchyplus
https://t.me/TheHolyWorld
https://t.me/nasiliebozeyagod
https://t.me/vsu_smertyy
https://t.me/uysozlarortamasjid
https://t.me/narod_party
https://t.me/Olga_Finanse
https://t.me/gwsnx
https://t.me/zxckursed123
https://t.me/RegentCoin
https://t.me/regent_efimova_maria
https://t.me/xiusharegent
https://t.me/egyptfortouragent
https://t.me/travelask_egypt
https://t.me/extremistybelarus
https://t.me/i_am_not_extremist
https://t.me/alimusssss
https://t.me/ali_the_esq
https://t.me/islamdag
https://t.me/bjiofficial
https://t.me/laxydra_1
https://t.me/la_hud_ra
https://t.me/MedninaDAO
https://t.me/MedinaMekka
https://t.me/MidlifeCrisisRussia
https://t.me/ekzeget
https://t.me/m_christianity
https://t.me/love_fore_the_world
https://t.me/nonmatrix_christianity
https://t.me/istinaISXR
https://t.me/ispglobal
https://t.me/katya_golden
https://t.me/bangladeshislamichhatrashibir
https://t.me/malsugenov_islam
https://t.me/SamimulIslamShahed
https://t.me/islamC
https://t.me/islamt
https://t.me/bakhshi_uzbekistan
https://t.me/Sila_Zakona_AB
https://t.me/sila_zakona72
https://t.me/fan_kulturnaya_revoluciya
https://t.me/INDPRcommunistChat
https://t.me/pltnkvachanal
https://t.me/SPPR_Channel
https://t.me/adaemon5
https://t.me/HotchyDenegRF
https://t.me/oqzyfn5J3dQ4N2Ey
https://t.me/der_avo
https://t.me/GEP_211_rahimov
https://t.me/ger_201_202
https://t.me/russianeconomicdevelopment
https://t.me/InvestCryptoHub
https://t.me/Mintrans_uz
https://t.me/solundar04
https://t.me/polisazerbaycan
https://t.me/officialmordovia
https://t.me/Azerbaijan_MOD
https://t.me/customschannel
https://t.me/respublika
https://t.me/sredniyklasslife
https://t.me/srednyclassmyz
https://t.me/afrosoyoz
https://t.me/konstantachannel
https://t.me/novinki_filmy
https://t.me/killerpiterse
https://t.me/detectiveaudioboks2025
https://t.me/Boeviki_Filmy_Novinki
https://t.me/akniga50
https://t.me/nihongo_soudann
https://t.me/soudanre
https://t.me/soudanbanat
https://t.me/soudaniamel
https://t.me/akistani_girl_leaked_viral_mm_0
https://t.me/BurhanStoreee
https://t.me/SheikhBurhanuddin
https://t.me/fud_pages
https://t.me/burhan_uz
https://t.me/burhan9
https://t.me/malmelmusic
https://t.me/MalMetr
https://t.me/eomemeoj
https://t.me/malme1
https://t.me/ZauraJhon
https://t.me/IndepRepSecHos
https://t.me/nrespublika
https://t.me/dages4taness
https://t.me/MordecaiAndRigbyToken
https://t.me/orda_mordora
https://t.me/mordorblog
https://t.me/Mordstorex
https://t.me/mordashkinson
https://t.me/MorDi
https://t.me/inflationbrand
https://t.me/inflation_shock
https://t.me/inflationresource
https://t.me/fkinflationtoken
https://t.me/inflationsfromlemur
https://t.me/inflation0
https://t.me/Asesinatos_tristeza
https://t.me/Saudi_Arabien
https://t.me/Dubai_Rencontre_Dauphin
https://t.me/saudovskaya_araviya
https://t.me/welcometosaudi
https://t.me/saydovskaya_araviya
https://t.me/inSaudia
https://t.me/lifeinksa
https://t.me/saudovskayaaraviya_rabota
https://t.me/BloodshedWar
https://t.me/SPL_Liga1
https://t.me/avtorrrrrrrrr
https://t.me/regentescirculo

370
tg_module/initial_group.py

@ -0,0 +1,370 @@
import random
from pymysql.converters import escape_string
import json, os
import sys
from telethon.tl.functions.messages import GetFullChatRequest, ImportChatInviteRequest
from tg_utils.tg_api import send_messages
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram-crawler')[0] + "telegram-crawler"
sys.path.append(root_path)
from tg_utils.tg_model import GroupFunc
from utils.tools import get_group_hash, get_data_from_sql, get_data_from_sql1
from telethon.tl.types import ChannelParticipantsAdmins, Updates, PeerChannel, Channel, Message
from config import PROXY, logger, CRAWLER_DB_CONF, TG_ROBOT_ACCOUNT_TABLE, TG_ROBOT_ACCOUNT_GROUP_TABLE
import asyncio
from telethon.tl.functions.channels import JoinChannelRequest
from utils.MysqlData import MysqlPoolClient
"""
"""
async def get_groups(client, api_hash):
"""
:param client:
:return:
"""
group_list = []
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
dialogs = await client.get_dialogs() # 获取所有群组的 ID 和名称
groups = [dialog for dialog in dialogs if dialog.is_channel] # 获取 `Chat` 、`Channel`
for group in groups:
group_ids = {
"group_id": group.id, # 监听的群组或者频道id
"group_name": group.name, # 群组或者频道名称
"participants_count": group.entity.participants_count, # 群人数
"group_owner": "", # 群组拥有人 可能没有
"is_group": group.is_group, # 为群还是频道
"description": "", # 描述
"discussion": {} # 相关的讨论组信息
}
# res = await client.get_participants(group, filter=ChannelParticipantsAdmins) # 管理员
# print(len(res), res[0])
if group.entity.username:
group_ids["group_owner"] = group.entity.username
elif group.entity.usernames:
group_ids["group_owner"] = group.entity.usernames[0].username
if group_ids["group_owner"]:
detail = await GroupFunc.get_related_group(client, group_ids["group_owner"])
group_ids.update(detail)
pass
if not group_ids["group_owner"]:
logger.error(f"不存在username: {group_ids}")
continue
group_list.append(group_ids)
logger.info(f"Group: {group_ids}")
telegram_link = f"https://t.me/{group_ids['group_owner']}"
sql = f"INSERT INTO {TG_ROBOT_ACCOUNT_GROUP_TABLE}(`api_hash`, `group_id`, `group_link`, `group_hash`, `group_title`, `group_status`, `create_time`, `update_time`) " \
"VALUES('%s','%s', '%s', '%s', '%s', '%s', NOW(), NOW()) on duplicate key update update_time=Now(), group_title=values(group_title), group_id=values(group_id)" \
% (api_hash, group.id, telegram_link, get_group_hash(f"{api_hash+telegram_link}"), escape_string(group.name), 1)
logger.info(sql)
client_mysql.getOne(sql)
# break
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (len(group_list), api_hash)
client_mysql.getOne(sql_robot)
logger.info(f"get group lengths is {len(group_list)}")
async def join_channel(client, channel, is_private=False):
"""
:param client:
:param channel:
:param is_private: joinchat
:return:
"""
if is_private:
res = await client(ImportChatInviteRequest(channel))
else:
res = await client(JoinChannelRequest(channel=channel))
logger.info(res) # Updates.chats[0].username
if res and isinstance(res, Updates):
chat = res.chats[0]
# res = await get_group(client, res.chats[0])
if isinstance(chat, Channel):
res = {"title": chat.title, "group_id": -1000000000000 - res.chats[0].id, "allow_sending": None}
is_banned = await GroupFunc.is_banned(client, chat) # 查看是否被禁
if not is_banned:
allow_status = await GroupFunc.get_group_rights(client, chat)
res["allow_sending"] = allow_status if allow_status else None
logger.info(f"返回的权限信息 ! => {res}")
return res
return False
async def leave_channel(client, channel):
"""
退
This method can be used as a user and as a bot. However,
bots will only be able to use it to leave groups and channels
(trying to delete a private conversation will do nothing).
:param client:
:param channel:
:return:
"""
res = await client.delete_dialog(channel) # leave groups and channels
logger.info(res) # Updates.chats[0].username
if res and isinstance(res, Updates):
res = {"title": res.chats[0].title, "group_id": -1000000000000 - res.chats[0].id}
return res
return False
async def main(session_string, api_id, api_hash, channel="", is_private=False, is_leave=False):
"""
Telethon
:param session_string:
:param api_id:
:param api_hash:
:param channel:
:param is_private:
:param is_private: 退
:return:
"""
client = await GroupFunc.login(session_string, api_id, api_hash)
# print(StringSession.save(client.session)) # 把session保存为字符串形式
try:
if is_leave:
res = await leave_channel(client, channel) # 退群
else:
res = await join_channel(client, channel, is_private=is_private) # 加群
except Exception as e:
logger.error(f"join error: {e}")
res = str(e)
# 断开连接
await client.disconnect()
return res
async def init_group(session_string, api_id, api_hash):
"""
:param session_string:
:param api_id:
:param api_hash:
:return:
"""
# 创建 Telethon 客户端对象并进行身份验证
client = await GroupFunc.login(session_string, api_id, api_hash)
await get_groups(client, api_hash) # 获取群组信息
await client.disconnect()
async def mul_account():
"""
:return:
"""
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
sql = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by update_time"
results = client_mysql.getAll(sql)
tasks = []
for i in results:
api_id = i["api_id"]
api_hash = i["api_hash"]
session_string = i["session_string"]
print(i)
await init_group(session_string, api_id, api_hash)
# tasks.append(init_group(session_string, api_id, api_hash))
# await get_login(api_id, api_hash, client_mysql)
# break
# await asyncio.gather(*tasks)
def update_sql(res, telegram_link, group, client_mysql):
"""
sql
:param res:
:param telegram_link:
:param group:
:param client_mysql:
:return:
"""
state_203 = "You have successfully requested to join this chat or channel"
if isinstance(res, dict) or state_203 in res: # 成功加群和203表示更新
flag = True if isinstance(res, dict) else False
group_status = 1 if flag else 0
group_id = res["group_id"] if flag else ""
group_title = escape_string(res["title"]) if flag else ""
sql = f"INSERT INTO {TG_ROBOT_ACCOUNT_GROUP_TABLE}(`api_hash`, `group_id`, `group_link`, `group_hash`, `group_title`, `group_status`, `create_time`, `update_time`) " \
"VALUES('%s','%s', '%s', '%s', '%s', '%s', NOW(), NOW()) on duplicate key update update_time=Now(), group_title=values(group_title), group_id=values(group_id) " \
% (group["api_hash"], group_id, telegram_link,
get_group_hash(f"{group['api_hash'] + telegram_link}"),
group_title, group_status)
client_mysql.getOne(sql)
if flag: # 成功加群再加1
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (
group["total_group"] + 1, group["api_hash"])
client_mysql.getOne(sql_robot)
else:
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set update_time=NOW() where api_hash='%s'" % (group["api_hash"])
client_mysql.getOne(sql_robot)
def update_sql1(res, telegram_link, group, client_mysql):
"""
退sql
:param res:
:param telegram_link:
:param group:
:param client_mysql:
:return:
"""
if isinstance(res, dict): # 成功退群
# 修改task状态
sql = f"UPDATE {TG_ROBOT_ACCOUNT_GROUP_TABLE} set group_status=0 where group_link='%s'" % (telegram_link)
client_mysql.getOne(sql)
# account表状态
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (
group["total_group"] - 1, group["api_hash"])
client_mysql.getOne(sql_robot)
else:
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set update_time=NOW() where api_hash='%s'" % (group["api_hash"])
client_mysql.getOne(sql_robot)
async def run(telegram_link, channel, is_private=False, is_leave=False):
"""
sql
:param telegram_link:
:param channel:
:param is_private:
:param is_leave: 退
:return:
"""
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
# sql_group = f"select * from {TG_ROBOT_ACCOUNT_GROUP_TABLE} where group_link='%s'" % telegram_link
sql_group = f"select a.*, b.api_id, b.session_string, b.total_group from {TG_ROBOT_ACCOUNT_GROUP_TABLE} a, {TG_ROBOT_ACCOUNT_TABLE} b where group_link='%s' and a.api_hash=b.api_hash" % telegram_link
count, group = client_mysql.getOne(sql_group)
if is_leave: # 退群
if group: # 以前加过
state = group["group_status"]
if state == 0: return "退群成功" # 没加成功/已经退过群
else: # 走退群流程 退群+更新sql
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_leave=True)
logger.info(f"退群,账号信息:{group['api_id']}")
update_sql1(res, telegram_link, group, client_mysql)
return "退群成功"
else:
return "该群不存在"
if group:
state = group["group_status"]
if state == 0: # 状态为0 表示需要同意
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_private)
logger.info(f"重新加群,账号信息:{group['api_id']}")
update_sql(res, telegram_link, group, client_mysql)
return res
return {"title": group["group_title"], "group_id": group["group_id"]}
else:
sql_robot = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by total_group, update_time"
count, group = client_mysql.getOne(sql_robot)
logger.info(f"获取账号:{group['api_id']}")
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_private)
update_sql(res, telegram_link, group, client_mysql)
return res
async def _replay(telegram_link, channel, message_list):
"""
:param telegram_link:
:param channel:
:param message_list:
:return:
"""
# 先查询这个群是否加群成功
sql_group = f"select a.*, b.api_id, b.session_string, b.total_group from {TG_ROBOT_ACCOUNT_GROUP_TABLE} a, {TG_ROBOT_ACCOUNT_TABLE} b where group_link='%s' and a.api_hash=b.api_hash" % telegram_link
group = get_data_from_sql(sql_group)
result = {
"send_message_status": "", # 发送成功 视为成功
"allow_sending": None
}
# # TODO 测试账号
# group = {
# "group_status": 1,
# "api_id": "22955009",
# "api_hash": "c5581d1ed5880f6e0b1734dad1d4a10d",
# "session_string": "1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA="
# }
if group and group["group_status"]: # 群组存在 且加群成功
client = await GroupFunc.login(group["session_string"], group["api_id"], group["api_hash"])
group_entity = await client.get_entity(channel) # 获取群组类型
is_banned = await GroupFunc.is_banned(client, group_entity) # 查看是否被禁
if not is_banned:
allow_status = await GroupFunc.get_group_rights(client, group_entity)
result["allow_sending"] = allow_status if allow_status else None
if allow_status: # 成功则返回消息对象 不成功则返回
mess_status = await send_messages(client, group_entity, message_list, allow_status)
logger.info(mess_status)
result["send_message_status"] = "发送成功" if isinstance(mess_status, Message) else mess_status
else:
result["send_message_status"] = "当前群组不支持发送消息"
else:
result["send_message_status"] = "群组管理员禁止发送消息"
await client.disconnect()
else:
result["send_message_status"] = "群组并未添加,检查群组状态"
return result
async def _detail(telegram_link, file="profile"):
"""
:param telegram_link:
:return:
"""
sql_group = f"select api_id, session_string, api_hash from {TG_ROBOT_ACCOUNT_TABLE}"
group_list = get_data_from_sql1(sql_group)
group = random.choice(group_list) # 随机选择一个账号
# 测试账号
# group = {
# "group_status": 1,
# "api_id": "22955009",
# "api_hash": "c5581d1ed5880f6e0b1734dad1d4a10d",
# "session_string": "1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA="
# }
client = await GroupFunc.login(group["session_string"], group["api_id"], group["api_hash"])
group_detail = await GroupFunc.get_group(client, telegram_link, file=file) # 会自动上传头像
await client.disconnect() # 断开链接
logger.info(f"当前使用账号: {group['api_id']} => {telegram_link} => {group_detail}")
return group_detail
if __name__ == "__main__":
# 运行主循环
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_detail("rtnoticias"))
except Exception as e:
print(f"An error occurred: {e}")

219
tg_module/receive_message.py

@ -0,0 +1,219 @@
# code=utf-8
import os, sys
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram-crawler')[0] + "telegram-crawler"
sys.path.append(root_path)
from utils.MysqlData import MysqlPoolClient
from utils.push_kafka import SKafka
from telethon import TelegramClient, events
import json
from telethon.sessions import StringSession
from telethon.tl.types import User
import time
from config import logger, SESSION_STRING, API_ID, API_HASH, PROXY, TOPIC_ADDRESS, TOPIC_DIC, CRAWLER_DB_CONF, \
TG_ROBOT_ACCOUNT_TABLE
import telethon, asyncio
from tg_utils.tg_api import *
from utils.upload_files import upload_file
"""
nohup python receive_message.py > rece.log 2>&1 &
"""
def get_group(api_id):
"""
TODO:sql读取
:return:
"""
with open("group_ids.json", 'r') as f:
chat_names = json.load(f)
group_ids = [group["group_id"] for group in chat_names if group["group_id"] and not group["is_group"]]
# 目前是 退群了仍旧会被监控到? 采用黑名单 下列群组不再接受监控 :(
group_ids = [
-1001440842385, -1001121481396, -1001643896726,
-1001156230593, -1001964495180, -1001510876777,
-1001878335287, -1001360128984, -1001461057234,
-1001144411934, -1001790944354, -1001759695404,
-1001643896726, -1002001492389
]
logger.success(f"load group lengths {len(group_ids) if group_ids else 0}")
return group_ids
def push_kafka(kp, data, topic):
"""
kafka推送
:param kp:
:param data:
:param topic:
:return:
"""
try:
if isinstance(data, list):
kp.sync_producer(topic, data)
else:
kp.sync_producer(topic, [data])
except Exception as e:
logger.error(f"数据推送失败:{data}")
logger.error(f"kafka error: {e}")
async def update_filename(message, media, file="resources"):
"""
:param message:
:param media:
:param file:
:return:
"""
file_type = telethon.utils.get_extension(media)
logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 修改文件名称
time_stamp = str(int(time.time()))
file = f"{file}/{time_stamp}_{message.id}{file_type}"
filename = await download_media(message, file=file)
upload_path = await upload_file(filename) # 上传文件
return upload_path
else:
filename = await download_media(message)
upload_path = await upload_file(filename) # 上传文件
return upload_path
async def download_resources(message, media, client, sender, file="resources"):
"""
:param message:
:param media:
:param client:
:param sender:
:param file:
:return:
"""
new_file_name = None # 对于没有媒体类的消息 直接不下载了
if media:
file_type = telethon.utils.get_extension(media)
# logger.info(f"捕获文件类型:{file_type}")
if "." in file_type: # 能识别出来类型的 修改文件名称
time_stamp = str(int(time.time()))
new_file_name = f"{file}/{time_stamp}_{message.id}{file_type}"
else:
new_file_name = file
# 下载telegram 媒体类文件
media_path, photo_path = await asyncio.gather(
download_media(message, file=new_file_name), # 媒体文件下载
download_profile_photo(client, sender) # 下载头像
)
logger.info(f"下载媒体文件路径:{media_path} ; 下载头像文件路径:{photo_path} ;")
# 上传go-fast
upload_media_path, upload_photo_path = await asyncio.gather(
upload_file(media_path),
upload_file(photo_path)
)
logger.info(f"go-fast媒体文件路径:{upload_media_path} ; go-fast头像路径:{upload_photo_path} ;")
return upload_media_path, upload_photo_path
async def main(session_string, api_id, api_hash, kp=None, message_topic=""):
session = StringSession(session_string)
logger.info(f"客户端 {api_id} 启动!!!!")
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
@client.on(events.NewMessage(chats=get_group(api_id), incoming=True, blacklist_chats=True))
async def handler(event):
"""
chats控制
:param event:
:return:
"""
sender = await event.get_sender() # 群组接受的为user 频道则接受的为频道
chat = await event.get_chat()
# if not sender or (hasattr(sender, "bot") and sender.bot): # 群组机器人过滤
# logger.info(f"过滤机器人测试: {sender}")
# return
if not sender: # 没有sender可能也有消息
logger.info(f" 不存在发送者 群组异常: {chat}")
# logger.info(f"meesages: {event}")
# logger.info(f"messages: {event.message}")
sender = chat # 一般此时为channel
if isinstance(sender, User):
sender_name = str(sender.first_name) + ' ' + str(sender.last_name)
else: # 如果是频道的话 发送者是以频道为对象发送
sender_name = sender.title
message = event.message
media = event.media
other_link = await get_extra_linked(message) # 获得超链接
file_list, user_photo = await download_resources(message, media, client, sender)
message_text = message.message
date = message.date
replay = message.reply_to # MessageReplyHeader 对象
data = {
"group_title": chat.title,
"group_id": -1000000000000 - chat.id,
"username": str(chat.username),
"sender_name": sender_name,
"sender_id": str(sender.id),
"media": file_list,
"sender_photo": user_photo,
"message_id": message.id,
"reply_to_msg_id": replay.reply_to_msg_id if replay else None, # 这个字段是针对某一条聊天记录的回复 *****
"reply_to_top_id": replay.reply_to_top_id if replay else None, # 这个字段是针对频道内的讨论组 针对的某一条聊天内容发起的讨论
"message_text": message_text,
"other_link": other_link, # 聊天过程中的超链接 以及对应的位置
"datetime": date.strftime("%Y-%m-%d %H:%M:%S")
}
logger.debug(f"client: {api_id} ; data: {data}")
# logger.debug(f"message: {message}")
push_kafka(kp, json.dumps(data, ensure_ascii=False), message_topic)
if client:
logger.info("client start")
await client.run_until_disconnected()
async def mul_account():
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
sql = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by update_time"
results = client_mysql.getAll(sql)
kp = SKafka(bootstrap_servers=TOPIC_ADDRESS)
message_topic = TOPIC_DIC["testtelegram"]
logger.info(f"链接 kafka {TOPIC_ADDRESS}:{message_topic} 成功")
tasks = []
for i in results:
api_id = i["api_id"]
api_hash = i["api_hash"]
session_string = i["session_string"]
logger.info(i)
tasks.append(main(session_string, api_id, api_hash, kp, message_topic))
logger.info(f"获得任务数量 {len(tasks)}")
await asyncio.gather(*tasks)
if __name__ == '__main__':
# main()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(mul_account())
except Exception as e:
print(f"An error occurred: {e}")
# mul_account()
# pass

0
tg_utils/__init__.py

119
tg_utils/tg_api.py

@ -0,0 +1,119 @@
import time
from telethon.tl.types import MessageEntityTextUrl, ChannelFull
from utils.upload_files import download_file
async def download_media(message, file="resources"):
"""
:param message:
:param file:
:return:
"""
if file:
filename = await message.download_media(file=file) # 指定下载目录 组图会分为多个message传递
return filename
async def download_profile_photo(client, sender, file="profile"):
"""
:param client:
:param sender:
:param file:
:return:
"""
if isinstance(sender, ChannelFull):
photo = sender.chat_photo
else: # UserProfilePhoto 对象 默认头像这个字段为None
photo = sender.photo
if photo:
path = await client.download_profile_photo(sender, file=file) # 下载头像
return path
async def get_messages(client, chat, message_id):
"""
id的聊天信息
:param client:
:param chat:
:param message_id:
:return:
"""
message = await client.get_messages(chat, ids=message_id)
print("message_1337!,", message)
return message
async def get_extra_linked(message):
"""
:param message:
:return:
"""
entities = message.entities
message_text = message.message
other_link = []
if entities:
text_urls = [entry for entry in entities if isinstance(entry, MessageEntityTextUrl)]
for _ in text_urls:
url = _.url
text = message_text[_.offset: _.offset+_.length] # 内容可能由于某些字符导致偏移 造成只有原始文本的一部分
# logger.info(f"捕获到超链接:{text} => {url}")
other_link.append(dict(url=url, offset=_.offset, length=_.length))
return other_link
async def send_messages(client, user, messages, allow_status, file="resources"):
"""
:param client:
:param user:
:param messages: {
"message": "", # 文本
"media_type": "",
"media_link": ""
}
:param allow_status:
:param file:
:return:
"""
result = ""
map_dict = {
"photos": ".jpg",
"videos": ".mp4",
"docs": "",
}
if messages:
content = messages.get("message")
media_type = f"send_{messages.get('media_type')}"
media_link = messages.get("media_link")
flag = 0
file_path = ""
if content and ("send_plain" in allow_status.keys()) and allow_status["send_plain"]: # 判断是否允许文本发送
flag += 1
if (media_type in allow_status.keys()) and allow_status[media_type]: # 判断是否允许媒体发送
time_stamp = str(int(time.time()))
file_type = map_dict[messages.get('media_type')] if map_dict[messages.get('media_type')] else \
media_link.split("/")[-1]
file_name = f"{file}/{time_stamp}{file_type}"
file_path = await download_file(media_link, file_name) # 先下载
flag += 2
if flag == 0:
result = "权限不支持"
elif flag == 1: # 只发文本
result = await client.send_message(user, content)
elif flag == 2: # 只发图片
result = await client.send_file(user, file_path)
elif flag == 3: # 图文发送
result = await client.send_message(user, content, file=file_path)
return result
else:
return result

143
tg_utils/tg_model.py

@ -0,0 +1,143 @@
from telethon import TelegramClient, functions
from telethon.sessions import StringSession
from config import logger, PROXY, TG_ROBOT_ACCOUNT_TABLE
from tg_utils.tg_api import download_profile_photo
from utils.upload_files import upload_file
class GroupFunc():
"""
TODO:
"""
@staticmethod
async def login(session_string, api_id, api_hash):
"""
session_string
:param session_string:
:param api_id:
:param api_hash:
:return:
"""
session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐)
client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
return client
@staticmethod
async def get_login(api_id, api_hash, client_mysql):
"""
session重新登录获得 string_session
:param api_id:
:param api_hash:
:param client_mysql:
:return:
"""
# 创建 Telethon 客户端对象并进行身份验证
client = TelegramClient(api_hash, api_id, api_hash, timeout=60, proxy=PROXY)
await client.start()
string_session = StringSession.save(client.session)
logger.info(f"sss: {string_session}")
if string_session: # 更新 session
sql = f"update {TG_ROBOT_ACCOUNT_TABLE} set session_string='{string_session}' where api_hash='{api_hash}'"
client_mysql.getOne(sql)
await client.disconnect()
return string_session
@staticmethod
async def get_related_group(client, group_name):
"""
info信息和讨论组
:param client:
:param group_name:
:return:
"""
detail = {
"description": "",
"discussion": {}
}
full = await client(functions.channels.GetFullChannelRequest(channel=group_name))
full_channel = full.full_chat
detail["description"] = full_channel.about
discussion_id = full_channel.linked_chat_id
if discussion_id:
detail["discussion"] = {
"group_id": -1000000000000 - discussion_id
}
return detail
@staticmethod
async def get_group(client, channel, file="profile"):
"""
:param client:
:param channel:
:param file:
:return:
"""
group = {
"group_id": "",
"title": "",
"photo": "",
"description": "",
"participants_count": "",
}
full = await client(functions.channels.GetFullChannelRequest(channel=channel))
full_channel = full.full_chat
group["description"] = full_channel.about
group["group_id"] = -1000000000000 - full_channel.id
group["title"] = full.chats[0].title
group["participants_count"] = full_channel.participants_count
path = await download_profile_photo(client, full_channel, file=file)
group["photo"] = await upload_file(path)
# logger.info(group)
return group
@staticmethod
async def get_group_rights(client, channel):
"""
:param client:
:param channel:
:return:
"""
_allow_status = {
"send_photos": False,
"send_videos": False,
"send_plain": False,
"send_docs": False,
}
if isinstance(channel, str):
group_entity = await client.get_entity(channel) # 获取群组类型
rights = group_entity.default_banned_rights
else:
rights = channel.default_banned_rights # 群组默认的权限对象
if rights: # 目前发现只有群组存在这个值
logger.info(f"群组权限: {channel.title} => {rights}")
if not rights.send_messages:
_allow_status = {
"send_photos": not rights.send_photos, # 发送图片
"send_videos": not rights.send_videos, # 发送视频
"send_plain": not rights.send_plain, # 发送文本
"send_docs": not rights.send_docs, # 发送文件
}
else: # 群组类型不符合要求
_allow_status = False
return _allow_status
@staticmethod
async def is_banned(client, channel):
"""
:param client:
:param channel:
:return:
"""
permissions = await client.get_permissions(channel, 'me')
logger.info(f"是否存在权限 ! => {permissions.is_banned}")
return permissions.is_banned

111
utils/Logger.py

@ -0,0 +1,111 @@
import os
from functools import wraps
from time import perf_counter
from loguru import logger
# from loguru._logger import Logger
class MyLogger:
"""
"""
def __init__(self, log_dir='logs', max_size=20, retention='7 days'):
self.log_dir = log_dir
self.max_size = max_size
self.retention = retention
self.logger = self.configure_logger()
def configure_logger(self):
"""
Returns:
"""
# 创建日志目录
os.makedirs(self.log_dir, exist_ok=True)
shared_config = {
"level": "DEBUG",
"enqueue": True,
"backtrace": True,
"format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
}
# 添加按照日期和大小切割的文件 handler
logger.add(
sink=f"{self.log_dir}/{{time:YYYY-MM-DD}}.log",
rotation=f"{self.max_size} MB",
retention=self.retention,
**shared_config
)
# 配置按照等级划分的文件 handler 和控制台输出
logger.add(sink=self.get_log_path, **shared_config)
return logger
def get_log_path(self, message: str) -> str:
"""
Args:
message:
Returns:
"""
log_level = message.record["level"].name.lower()
log_file = f"{log_level}.log"
log_path = os.path.join(self.log_dir, log_file)
return log_path
def __getattr__(self, level: str):
return getattr(self.logger, level)
def log_decorator(self, msg="快看,异常了,别唧唧哇哇,块排查"):
"""
Args:
logger:
Returns:
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
self.logger.info(f'-----------分割线-----------')
self.logger.info(f'调用 {func.__name__} args: {args}; kwargs:{kwargs}')
start = perf_counter() # 开始时间
try:
result = func(*args, **kwargs)
end = perf_counter() # 结束时间
duration = end - start
self.logger.info(f"{func.__name__} 返回结果:{result}, 耗时:{duration:4f}s")
return result
except Exception as e:
self.logger.exception(f"{func.__name__}: {msg}")
self.logger.info(f"-----------分割线-----------")
# raise e
return wrapper
return decorator
if __name__ == '__main__':
log = MyLogger()
for i in range(1000):
log.error('错误信息')
log.critical('严重错误信息')
log.debug('调试信息')
log.info('普通信息')
log.success('成功信息')
log.warning('警告信息')

197
utils/MysqlData.py

@ -0,0 +1,197 @@
import threading
import traceback
import pymysql
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB
class MysqlPoolClient(object):
"""
MYSQL数据库对象 , conn = Mysql.getConn()
;conn.close()del conn
"""
# 连接池对象
__pool = {}
__lock = threading.Lock()
# TODO(YaoPeng): 反复加锁影响性能,但是爬虫场景下,可以暂时容忍
def __init__(self, db_conf):
MysqlPoolClient.__lock.acquire()
# 数据库构造函数,从连接池中取出连接,并生成操作游标
# pip install DBUtils
self._conn = MysqlPoolClient.__getConn(db_conf)
self._cursor = self._conn.cursor()
MysqlPoolClient.__lock.release()
def __del__(self):
self.dispose()
@staticmethod
def __getConn(db_conf):
pool_name = db_conf.DB_FULL_NAME
"""
@summary:
@return MySQLdb.connection
"""
if pool_name not in MysqlPoolClient.__pool:
MysqlPoolClient.__pool[pool_name] = PooledDB(creator=pymysql,
mincached=1,
maxcached=20,
host=db_conf.DBHOST,
port=db_conf.DBPORT,
user=db_conf.DBUSER,
passwd=db_conf.DBPWD,
db=db_conf.DBNAME,
use_unicode=True,
charset=db_conf.DBCHAR,
cursorclass=DictCursor)
return MysqlPoolClient.__pool[pool_name].connection()
def getAll(self, sql, param=None):
"""
@summary:
@param sql:使[param]
@param param: /
@return: result list()/boolean
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
query_result = self._cursor.fetchall()
else:
query_result = False
return query_result
def getOne(self, sql, param=None):
"""
@summary:
@param sql:使[param]
@param param: /
@return: result list/boolean
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
query_result = self._cursor.fetchone()
else:
query_result = False
return count,query_result
def getMany(self, sql, num, param=None):
"""
@summary: num条结果
@param sql:使[param]
@param num:
@param param: /
@return: result list/boolean
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
query_result = self._cursor.fetchmany(num)
else:
query_result = False
return query_result
def insertOne(self, sql, value=None):
"""
@summary:
@param sql:
@param value:tuple/list
@return: insertId
"""
self._cursor.execute(sql, value)
return self.__getInsertId()
def insertMany(self, sql, values):
"""
@summary:
@param sql:
@param values:tuple(tuple)/list[list]
@return: count
"""
count = self._cursor.executemany(sql, values)
return count
def updateMany(self, sql, values):
"""
@summary:
@param sql:
@param values:tuple(tuple)/list[list]
@return: count
"""
count = self._cursor.executemany(sql, values)
return count
def __getInsertId(self):
"""
id,
"""
self._cursor.execute("SELECT @@IDENTITY AS id")
result = self._cursor.fetchall()
return result[0]['id']
def __query(self, sql, param=None, commit=True):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if commit:
self._conn.commit()
return count
def update(self, sql, param=None):
"""
@summary:
@param sql: 使(%s,%s)
@param param: tuple/list
@return: count
"""
return self.__query(sql, param)
def delete(self, sql, param=None):
"""
@summary:
@param sql: 使(%s,%s)
@param param: tuple/list
@return: count
"""
return self.__query(sql, param)
def begin(self):
"""
@summary:
"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""
@summary:
"""
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, is_end=1):
"""
@summary:
"""
MysqlPoolClient.__lock.acquire()
if is_end == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()
MysqlPoolClient.__lock.release()

0
utils/__init__.py

50
utils/push_kafka.py

@ -0,0 +1,50 @@
from kafka import KafkaProducer, KafkaConsumer
from config import logger
class SKafka():
def __init__(self, bootstrap_servers: str, api_version=None, is_json_schema: bool = False,encoding: str = "utf-8",
security_protocol: str = "PLAINTEXT", sasl_mechanism: str = None,
sasl_plain_username: str = None, sasl_plain_password: str = None):
self.is_json_schema = is_json_schema
self.encoding = encoding
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,api_version=api_version,
security_protocol=security_protocol,sasl_mechanism=sasl_mechanism,
sasl_plain_username=sasl_plain_username,sasl_plain_password=sasl_plain_password)
def sync_producer(self, topic, message):
count = 0
for date in message:
if date:
date = date.encode()
self.producer.send(topic, date)
count += 1
logger.info('push {} success,date length:{} vail date length: {} '.format(topic, len(message), count))
# self.producer.flush()
def close_producer(self):
try:
self.producer.close()
print("connect close")
except Exception as e:
print(e)
pass
# kafka设置了密码,没有就不用设置那么多
if __name__ == '__main__':
# kp = SKafka(bootstrap_servers='172.18.1.119:9992')
hosts = "172.18.1.119:9992" # 连接hosts
topic = "dazhongdianping"
# producer = KafkaProducer(bootstrap_servers='172.18.1.119:9992')
consumer = KafkaConsumer('dazhongdianping', bootstrap_servers='172.18.1.119:9992')
for message in consumer:
# 解析并处理消息
print(f"Received message: {message.value.decode('utf-8')}")

22
utils/tools.py

@ -0,0 +1,22 @@
from hashlib import md5
from config import CRAWLER_DB_CONF
from utils.MysqlData import MysqlPoolClient
def get_group_hash(data):
M = md5()
M.update(data.encode("utf-8"))
return M.hexdigest()
def get_data_from_sql(sql, db=CRAWLER_DB_CONF):
client_mysql = MysqlPoolClient(db)
count, data = client_mysql.getOne(sql)
return data
def get_data_from_sql1(sql, db=CRAWLER_DB_CONF):
client_mysql = MysqlPoolClient(db)
data = client_mysql.getAll(sql)
return data

56
utils/upload_files.py

@ -0,0 +1,56 @@
import aiohttp
import asyncio
import json
from aiohttp import FormData
from loguru import logger
from config import UPLOAD_URL
async def upload_file(file_path, url=UPLOAD_URL):
"""
:param file_path:
:param url:
:return:
"""
if file_path:
async with aiohttp.ClientSession() as session:
data = FormData()
data.add_field("file",open(file_path, 'rb'),content_type='multipart/form-data;charset=utf-8"')
data.add_field("output", "json")
async with session.post(url, data=data) as response:
result = await response.text() # 返回结果为json字符串
result = json.loads(result)
# logger.info(f"upload file {result}")
if 'src' in result.keys():
video_path = result['src']
return video_path
async def download_file(url, filename):
"""
:param url:
:return:
"""
async with aiohttp.ClientSession() as session:
logger.info(f"Starting download file from {url}")
async with session.get(url) as response:
assert response.status == 200
with open(filename, "wb") as f:
while True:
chunk = await response.content.readany()
if not chunk:
break
f.write(chunk)
logger.info(f"Downloaded {filename} from {url}")
return filename
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# loop.run_until_complete(upload_file("resources/photo_2024-01-17_02-05-00.jpg"))
loop.run_until_complete(download_file("http://172.18.1.180:9980/group17/default/20240507/17/07/3/image.jpg", "../resources/image.jpg"))

12
uwsgi.ini

@ -0,0 +1,12 @@
[uwsgi]
chdir=/opt/crawl/telegram/telegram-crawler
http=0.0.0.0:8080
stats=%(chdir)/uwsgi/uwsgi.status
pidfile=%(chdir)/uwsgi/uwsgi.pid
wsgi-file=%(chdir)/server.py
daemonize=%(chdir)/uwsgi/uwsgi.log
callable=app
processes=2
threads=2
buffer-size=65536
env LANGUAGE="en_US.UTF-8"
Loading…
Cancel
Save