telegram 群组监控 / 群组功能
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
15 KiB

import random
from pymysql.converters import escape_string
import json, os
import sys
from telethon.tl.functions.messages import GetFullChatRequest, ImportChatInviteRequest
from tg_utils.tg_api import send_messages
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram_crawler')[0] + "telegram_crawler"
sys.path.append(root_path)
from tg_utils.tg_model import GroupFunc
from utils.tools import get_group_hash, get_data_from_sql, get_data_from_sql1
from telethon.tl.types import ChannelParticipantsAdmins, Updates, PeerChannel, Channel, Message
from config import PROXY, logger, CRAWLER_DB_CONF, TG_ROBOT_ACCOUNT_TABLE, TG_ROBOT_ACCOUNT_GROUP_TABLE
import asyncio
from telethon.tl.functions.channels import JoinChannelRequest
from utils.MysqlData import MysqlPoolClient
"""
初始化登陆,获取群组(频道)信息,并保存本地
"""
async def get_groups(client, api_hash):
"""
同步当前账号已加入的群信息 并保存
:param client:
:return:
"""
group_list = []
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
dialogs = await client.get_dialogs() # 获取所有群组的 ID 和名称
groups = [dialog for dialog in dialogs if dialog.is_channel] # 获取 `Chat` 、`Channel`
for group in groups:
group_ids = {
"group_id": group.id, # 监听的群组或者频道id
"group_name": group.name, # 群组或者频道名称
"participants_count": group.entity.participants_count, # 群人数
"group_owner": "", # 群组拥有人 可能没有
"is_group": group.is_group, # 为群还是频道
"description": "", # 描述
"discussion": {} # 相关的讨论组信息
}
# res = await client.get_participants(group, filter=ChannelParticipantsAdmins) # 管理员
# print(len(res), res[0])
if group.entity.username:
group_ids["group_owner"] = group.entity.username
elif group.entity.usernames:
group_ids["group_owner"] = group.entity.usernames[0].username
if group_ids["group_owner"]:
detail = await GroupFunc.get_related_group(client, group_ids["group_owner"])
group_ids.update(detail)
pass
if not group_ids["group_owner"]:
logger.error(f"不存在username: {group_ids}")
continue
group_list.append(group_ids)
logger.info(f"Group: {group_ids}")
telegram_link = f"https://t.me/{group_ids['group_owner']}"
sql = f"INSERT INTO {TG_ROBOT_ACCOUNT_GROUP_TABLE}(`api_hash`, `group_id`, `group_link`, `group_hash`, `group_title`, `group_status`, `create_time`, `update_time`) " \
"VALUES('%s','%s', '%s', '%s', '%s', '%s', NOW(), NOW()) on duplicate key update update_time=Now(), group_title=values(group_title), group_id=values(group_id)" \
% (api_hash, group.id, telegram_link, get_group_hash(f"{api_hash+telegram_link}"), escape_string(group.name), 1)
logger.info(sql)
client_mysql.getOne(sql)
# break
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (len(group_list), api_hash)
client_mysql.getOne(sql_robot)
logger.info(f"get group lengths is {len(group_list)}")
async def join_channel(client, channel, is_private=False):
"""
一次性没办法加很多,
会提示需要等待多久后才能再加群
新增加群状态
:param client:
:param channel:
:param is_private: 是否为私有群(joinchat)
:return:
"""
if is_private:
res = await client(ImportChatInviteRequest(channel))
else:
res = await client(JoinChannelRequest(channel=channel))
logger.info(res) # Updates.chats[0].username
if res and isinstance(res, Updates):
chat = res.chats[0]
# res = await get_group(client, res.chats[0])
if isinstance(chat, Channel):
res = {"title": chat.title, "group_id": -1000000000000 - res.chats[0].id, "allow_sending": None}
is_banned = await GroupFunc.is_banned(client, chat) # 查看是否被禁
if not is_banned:
allow_status = await GroupFunc.get_group_rights(client, chat)
res["allow_sending"] = allow_status if allow_status else None
logger.info(f"返回的权限信息 ! => {res}")
return res
return False
async def leave_channel(client, channel):
"""
退群
This method can be used as a user and as a bot. However,
bots will only be able to use it to leave groups and channels
(trying to delete a private conversation will do nothing).
:param client:
:param channel:
:return:
"""
res = await client.delete_dialog(channel) # leave groups and channels
logger.info(res) # Updates.chats[0].username
if res and isinstance(res, Updates):
res = {"title": res.chats[0].title, "group_id": -1000000000000 - res.chats[0].id}
return res
return False
async def main(session_string, api_id, api_hash, channel="", is_private=False, is_leave=False):
"""
创建 Telethon 客户端对象并进行身份验证
加群主函数
:param session_string:
:param api_id:
:param api_hash:
:param channel:
:param is_private:
:param is_private: 是否退群
:return:
"""
client = await GroupFunc.login(session_string, api_id, api_hash)
# print(StringSession.save(client.session)) # 把session保存为字符串形式
try:
if is_leave:
res = await leave_channel(client, channel) # 退群
else:
res = await join_channel(client, channel, is_private=is_private) # 加群
except Exception as e:
logger.error(f"join error: {e}")
res = str(e)
# 断开连接
await client.disconnect()
return res
async def init_group(session_string, api_id, api_hash):
"""
初始化群组信息
:param session_string:
:param api_id:
:param api_hash:
:return:
"""
# 创建 Telethon 客户端对象并进行身份验证
client = await GroupFunc.login(session_string, api_id, api_hash)
await get_groups(client, api_hash) # 获取群组信息
await client.disconnect()
async def mul_account():
"""
初始化目标账号的群组信息
:return:
"""
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
sql = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by update_time"
results = client_mysql.getAll(sql)
tasks = []
for i in results:
api_id = i["api_id"]
api_hash = i["api_hash"]
session_string = i["session_string"]
print(i)
await init_group(session_string, api_id, api_hash)
# tasks.append(init_group(session_string, api_id, api_hash))
# await get_login(api_id, api_hash, client_mysql)
# break
# await asyncio.gather(*tasks)
def update_sql(res, telegram_link, group, client_mysql):
"""
加群sql
:param res:
:param telegram_link:
:param group:
:param client_mysql:
:return:
"""
state_203 = "You have successfully requested to join this chat or channel"
if isinstance(res, dict) or state_203 in res: # 成功加群和203表示更新
flag = True if isinstance(res, dict) else False
group_status = 1 if flag else 0
group_id = res["group_id"] if flag else ""
group_title = escape_string(res["title"]) if flag else ""
sql = f"INSERT INTO {TG_ROBOT_ACCOUNT_GROUP_TABLE}(`api_hash`, `group_id`, `group_link`, `group_hash`, `group_title`, `group_status`, `create_time`, `update_time`) " \
"VALUES('%s','%s', '%s', '%s', '%s', '%s', NOW(), NOW()) on duplicate key update update_time=Now(), group_title=values(group_title), group_id=values(group_id) " \
% (group["api_hash"], group_id, telegram_link,
get_group_hash(f"{group['api_hash'] + telegram_link}"),
group_title, group_status)
client_mysql.getOne(sql)
if flag: # 成功加群再加1
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (
group["total_group"] + 1, group["api_hash"])
client_mysql.getOne(sql_robot)
else:
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set update_time=NOW() where api_hash='%s'" % (group["api_hash"])
client_mysql.getOne(sql_robot)
def update_sql1(res, telegram_link, group, client_mysql):
"""
退群sql
:param res:
:param telegram_link:
:param group:
:param client_mysql:
:return:
"""
if isinstance(res, dict): # 成功退群
# 修改task状态
sql = f"UPDATE {TG_ROBOT_ACCOUNT_GROUP_TABLE} set group_status=0 where group_link='%s'" % (telegram_link)
client_mysql.getOne(sql)
# account表状态
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (
group["total_group"] - 1, group["api_hash"])
client_mysql.getOne(sql_robot)
else:
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set update_time=NOW() where api_hash='%s'" % (group["api_hash"])
client_mysql.getOne(sql_robot)
async def run(telegram_link, channel, is_private=False, is_leave=False):
"""
加群任务 先查sql
:param telegram_link:
:param channel:
:param is_private: 是否为私有群
:param is_leave: 是否为退群请求
:return:
"""
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
# sql_group = f"select * from {TG_ROBOT_ACCOUNT_GROUP_TABLE} where group_link='%s'" % telegram_link
sql_group = f"select a.*, b.api_id, b.session_string, b.total_group from {TG_ROBOT_ACCOUNT_GROUP_TABLE} a, {TG_ROBOT_ACCOUNT_TABLE} b where group_link='%s' and a.api_hash=b.api_hash" % telegram_link
count, group = client_mysql.getOne(sql_group)
if is_leave: # 退群
if group: # 以前加过
state = group["group_status"]
if state == 0: return "退群成功" # 没加成功/已经退过群
else: # 走退群流程 退群+更新sql
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_leave=True)
logger.info(f"退群,账号信息:{group['api_id']}")
update_sql1(res, telegram_link, group, client_mysql)
return "退群成功"
else:
return "该群不存在"
if group:
state = group["group_status"]
if state == 0: # 状态为0 表示需要同意
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_private)
logger.info(f"重新加群,账号信息:{group['api_id']}")
update_sql(res, telegram_link, group, client_mysql)
return res
return {"title": group["group_title"], "group_id": group["group_id"]}
else:
sql_robot = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by total_group, update_time"
count, group = client_mysql.getOne(sql_robot)
logger.info(f"获取账号:{group['api_id']}")
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_private)
update_sql(res, telegram_link, group, client_mysql)
return res
async def _replay(telegram_link, channel, message_list):
"""
回复消息 以及 识别权限
:param telegram_link:
:param channel:
:param message_list:
:return:
"""
# 先查询这个群是否加群成功
sql_group = f"select a.*, b.api_id, b.session_string, b.total_group from {TG_ROBOT_ACCOUNT_GROUP_TABLE} a, {TG_ROBOT_ACCOUNT_TABLE} b where group_link='%s' and a.api_hash=b.api_hash" % telegram_link
group = get_data_from_sql(sql_group)
result = {
"send_message_status": "", # 发送成功 视为成功
"allow_sending": None
}
# # TODO 测试账号
# group = {
# "group_status": 1,
# "api_id": "22955009",
# "api_hash": "c5581d1ed5880f6e0b1734dad1d4a10d",
# "session_string": "1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA="
# }
if group and group["group_status"]: # 群组存在 且加群成功
client = await GroupFunc.login(group["session_string"], group["api_id"], group["api_hash"])
group_entity = await client.get_entity(channel) # 获取群组类型
is_banned = await GroupFunc.is_banned(client, group_entity) # 查看是否被禁
if not is_banned:
allow_status = await GroupFunc.get_group_rights(client, group_entity)
result["allow_sending"] = allow_status if allow_status else None
if allow_status: # 成功则返回消息对象 不成功则返回
mess_status = await send_messages(client, group_entity, message_list, allow_status)
logger.info(mess_status)
result["send_message_status"] = "发送成功" if isinstance(mess_status, Message) else mess_status
else:
result["send_message_status"] = "当前群组不支持发送消息"
else:
result["send_message_status"] = "群组管理员禁止发送消息"
await client.disconnect()
else:
result["send_message_status"] = "群组并未添加,检查群组状态"
return result
async def _detail(telegram_link, file="profile"):
"""
获取某个群组的状态
:param telegram_link:
:return:
"""
sql_group = f"select api_id, session_string, api_hash from {TG_ROBOT_ACCOUNT_TABLE}"
group_list = get_data_from_sql1(sql_group)
group = random.choice(group_list) # 随机选择一个账号
# 测试账号
# group = {
# "group_status": 1,
# "api_id": "22955009",
# "api_hash": "c5581d1ed5880f6e0b1734dad1d4a10d",
# "session_string": "1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA="
# }
client = await GroupFunc.login(group["session_string"], group["api_id"], group["api_hash"])
group_detail = await GroupFunc.get_group(client, telegram_link, file=file) # 会自动上传头像
await client.disconnect() # 断开链接
logger.info(f"当前使用账号: {group['api_id']} => {telegram_link} => {group_detail}")
return group_detail
if __name__ == "__main__":
# 运行主循环
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_detail("rtnoticias"))
except Exception as e:
print(f"An error occurred: {e}")