You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
370 lines
15 KiB
370 lines
15 KiB
import random
|
|
|
|
from pymysql.converters import escape_string
|
|
import json, os
|
|
import sys
|
|
|
|
from telethon.tl.functions.messages import GetFullChatRequest, ImportChatInviteRequest
|
|
|
|
from tg_utils.tg_api import send_messages
|
|
|
|
root_path = os.path.abspath(os.path.dirname(__file__)).split('telegram_crawler')[0] + "telegram_crawler"
|
|
sys.path.append(root_path)
|
|
|
|
from tg_utils.tg_model import GroupFunc
|
|
from utils.tools import get_group_hash, get_data_from_sql, get_data_from_sql1
|
|
|
|
from telethon.tl.types import ChannelParticipantsAdmins, Updates, PeerChannel, Channel, Message
|
|
|
|
from config import PROXY, logger, CRAWLER_DB_CONF, TG_ROBOT_ACCOUNT_TABLE, TG_ROBOT_ACCOUNT_GROUP_TABLE
|
|
import asyncio
|
|
from telethon.tl.functions.channels import JoinChannelRequest
|
|
|
|
from utils.MysqlData import MysqlPoolClient
|
|
|
|
"""
|
|
初始化登陆,获取群组(频道)信息,并保存本地
|
|
"""
|
|
|
|
|
|
async def get_groups(client, api_hash):
|
|
"""
|
|
同步当前账号已加入的群信息 并保存
|
|
:param client:
|
|
:return:
|
|
"""
|
|
group_list = []
|
|
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
|
|
dialogs = await client.get_dialogs() # 获取所有群组的 ID 和名称
|
|
groups = [dialog for dialog in dialogs if dialog.is_channel] # 获取 `Chat` 、`Channel`
|
|
for group in groups:
|
|
group_ids = {
|
|
"group_id": group.id, # 监听的群组或者频道id
|
|
"group_name": group.name, # 群组或者频道名称
|
|
"participants_count": group.entity.participants_count, # 群人数
|
|
"group_owner": "", # 群组拥有人 可能没有
|
|
"is_group": group.is_group, # 为群还是频道
|
|
"description": "", # 描述
|
|
"discussion": {} # 相关的讨论组信息
|
|
}
|
|
# res = await client.get_participants(group, filter=ChannelParticipantsAdmins) # 管理员
|
|
# print(len(res), res[0])
|
|
|
|
if group.entity.username:
|
|
group_ids["group_owner"] = group.entity.username
|
|
elif group.entity.usernames:
|
|
group_ids["group_owner"] = group.entity.usernames[0].username
|
|
|
|
if group_ids["group_owner"]:
|
|
detail = await GroupFunc.get_related_group(client, group_ids["group_owner"])
|
|
group_ids.update(detail)
|
|
pass
|
|
|
|
if not group_ids["group_owner"]:
|
|
logger.error(f"不存在username: {group_ids}")
|
|
continue
|
|
|
|
group_list.append(group_ids)
|
|
logger.info(f"Group: {group_ids}")
|
|
telegram_link = f"https://t.me/{group_ids['group_owner']}"
|
|
sql = f"INSERT INTO {TG_ROBOT_ACCOUNT_GROUP_TABLE}(`api_hash`, `group_id`, `group_link`, `group_hash`, `group_title`, `group_status`, `create_time`, `update_time`) " \
|
|
"VALUES('%s','%s', '%s', '%s', '%s', '%s', NOW(), NOW()) on duplicate key update update_time=Now(), group_title=values(group_title), group_id=values(group_id)" \
|
|
% (api_hash, group.id, telegram_link, get_group_hash(f"{api_hash+telegram_link}"), escape_string(group.name), 1)
|
|
logger.info(sql)
|
|
client_mysql.getOne(sql)
|
|
# break
|
|
|
|
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (len(group_list), api_hash)
|
|
client_mysql.getOne(sql_robot)
|
|
|
|
logger.info(f"get group lengths is {len(group_list)}")
|
|
|
|
|
|
async def join_channel(client, channel, is_private=False):
|
|
"""
|
|
一次性没办法加很多,
|
|
会提示需要等待多久后才能再加群
|
|
新增加群状态
|
|
:param client:
|
|
:param channel:
|
|
:param is_private: 是否为私有群(joinchat)
|
|
:return:
|
|
"""
|
|
if is_private:
|
|
res = await client(ImportChatInviteRequest(channel))
|
|
else:
|
|
res = await client(JoinChannelRequest(channel=channel))
|
|
logger.info(res) # Updates.chats[0].username
|
|
if res and isinstance(res, Updates):
|
|
chat = res.chats[0]
|
|
# res = await get_group(client, res.chats[0])
|
|
if isinstance(chat, Channel):
|
|
res = {"title": chat.title, "group_id": -1000000000000 - res.chats[0].id, "allow_sending": None}
|
|
is_banned = await GroupFunc.is_banned(client, chat) # 查看是否被禁
|
|
if not is_banned:
|
|
allow_status = await GroupFunc.get_group_rights(client, chat)
|
|
res["allow_sending"] = allow_status if allow_status else None
|
|
logger.info(f"返回的权限信息 ! => {res}")
|
|
return res
|
|
return False
|
|
|
|
|
|
async def leave_channel(client, channel):
|
|
"""
|
|
退群
|
|
This method can be used as a user and as a bot. However,
|
|
bots will only be able to use it to leave groups and channels
|
|
(trying to delete a private conversation will do nothing).
|
|
:param client:
|
|
:param channel:
|
|
:return:
|
|
"""
|
|
res = await client.delete_dialog(channel) # leave groups and channels
|
|
logger.info(res) # Updates.chats[0].username
|
|
if res and isinstance(res, Updates):
|
|
res = {"title": res.chats[0].title, "group_id": -1000000000000 - res.chats[0].id}
|
|
return res
|
|
return False
|
|
|
|
|
|
async def main(session_string, api_id, api_hash, channel="", is_private=False, is_leave=False):
|
|
"""
|
|
创建 Telethon 客户端对象并进行身份验证
|
|
加群主函数
|
|
:param session_string:
|
|
:param api_id:
|
|
:param api_hash:
|
|
:param channel:
|
|
:param is_private:
|
|
:param is_private: 是否退群
|
|
:return:
|
|
"""
|
|
client = await GroupFunc.login(session_string, api_id, api_hash)
|
|
# print(StringSession.save(client.session)) # 把session保存为字符串形式
|
|
try:
|
|
if is_leave:
|
|
res = await leave_channel(client, channel) # 退群
|
|
else:
|
|
res = await join_channel(client, channel, is_private=is_private) # 加群
|
|
except Exception as e:
|
|
logger.error(f"join error: {e}")
|
|
res = str(e)
|
|
|
|
# 断开连接
|
|
await client.disconnect()
|
|
return res
|
|
|
|
|
|
async def init_group(session_string, api_id, api_hash):
|
|
"""
|
|
初始化群组信息
|
|
:param session_string:
|
|
:param api_id:
|
|
:param api_hash:
|
|
:return:
|
|
"""
|
|
# 创建 Telethon 客户端对象并进行身份验证
|
|
client = await GroupFunc.login(session_string, api_id, api_hash)
|
|
await get_groups(client, api_hash) # 获取群组信息
|
|
await client.disconnect()
|
|
|
|
|
|
async def mul_account():
|
|
"""
|
|
初始化目标账号的群组信息
|
|
:return:
|
|
"""
|
|
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
|
|
sql = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by update_time"
|
|
results = client_mysql.getAll(sql)
|
|
tasks = []
|
|
for i in results:
|
|
api_id = i["api_id"]
|
|
api_hash = i["api_hash"]
|
|
session_string = i["session_string"]
|
|
print(i)
|
|
await init_group(session_string, api_id, api_hash)
|
|
# tasks.append(init_group(session_string, api_id, api_hash))
|
|
# await get_login(api_id, api_hash, client_mysql)
|
|
|
|
# break
|
|
|
|
# await asyncio.gather(*tasks)
|
|
|
|
|
|
def update_sql(res, telegram_link, group, client_mysql):
|
|
"""
|
|
加群sql
|
|
:param res:
|
|
:param telegram_link:
|
|
:param group:
|
|
:param client_mysql:
|
|
:return:
|
|
"""
|
|
state_203 = "You have successfully requested to join this chat or channel"
|
|
|
|
if isinstance(res, dict) or state_203 in res: # 成功加群和203表示更新
|
|
flag = True if isinstance(res, dict) else False
|
|
group_status = 1 if flag else 0
|
|
group_id = res["group_id"] if flag else ""
|
|
group_title = escape_string(res["title"]) if flag else ""
|
|
sql = f"INSERT INTO {TG_ROBOT_ACCOUNT_GROUP_TABLE}(`api_hash`, `group_id`, `group_link`, `group_hash`, `group_title`, `group_status`, `create_time`, `update_time`) " \
|
|
"VALUES('%s','%s', '%s', '%s', '%s', '%s', NOW(), NOW()) on duplicate key update update_time=Now(), group_title=values(group_title), group_id=values(group_id) " \
|
|
% (group["api_hash"], group_id, telegram_link,
|
|
get_group_hash(f"{group['api_hash'] + telegram_link}"),
|
|
group_title, group_status)
|
|
client_mysql.getOne(sql)
|
|
if flag: # 成功加群再加1
|
|
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (
|
|
group["total_group"] + 1, group["api_hash"])
|
|
client_mysql.getOne(sql_robot)
|
|
else:
|
|
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set update_time=NOW() where api_hash='%s'" % (group["api_hash"])
|
|
client_mysql.getOne(sql_robot)
|
|
|
|
|
|
def update_sql1(res, telegram_link, group, client_mysql):
|
|
"""
|
|
退群sql
|
|
:param res:
|
|
:param telegram_link:
|
|
:param group:
|
|
:param client_mysql:
|
|
:return:
|
|
"""
|
|
if isinstance(res, dict): # 成功退群
|
|
# 修改task状态
|
|
sql = f"UPDATE {TG_ROBOT_ACCOUNT_GROUP_TABLE} set group_status=0 where group_link='%s'" % (telegram_link)
|
|
client_mysql.getOne(sql)
|
|
# account表状态
|
|
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set total_group=%d where api_hash='%s'" % (
|
|
group["total_group"] - 1, group["api_hash"])
|
|
client_mysql.getOne(sql_robot)
|
|
else:
|
|
sql_robot = f"update {TG_ROBOT_ACCOUNT_TABLE} set update_time=NOW() where api_hash='%s'" % (group["api_hash"])
|
|
client_mysql.getOne(sql_robot)
|
|
|
|
|
|
async def run(telegram_link, channel, is_private=False, is_leave=False):
|
|
"""
|
|
加群任务 先查sql
|
|
:param telegram_link:
|
|
:param channel:
|
|
:param is_private: 是否为私有群
|
|
:param is_leave: 是否为退群请求
|
|
:return:
|
|
"""
|
|
client_mysql = MysqlPoolClient(CRAWLER_DB_CONF)
|
|
# sql_group = f"select * from {TG_ROBOT_ACCOUNT_GROUP_TABLE} where group_link='%s'" % telegram_link
|
|
sql_group = f"select a.*, b.api_id, b.session_string, b.total_group from {TG_ROBOT_ACCOUNT_GROUP_TABLE} a, {TG_ROBOT_ACCOUNT_TABLE} b where group_link='%s' and a.api_hash=b.api_hash" % telegram_link
|
|
count, group = client_mysql.getOne(sql_group)
|
|
|
|
if is_leave: # 退群
|
|
if group: # 以前加过
|
|
state = group["group_status"]
|
|
if state == 0: return "退群成功" # 没加成功/已经退过群
|
|
else: # 走退群流程 退群+更新sql
|
|
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_leave=True)
|
|
logger.info(f"退群,账号信息:{group['api_id']}")
|
|
update_sql1(res, telegram_link, group, client_mysql)
|
|
return "退群成功"
|
|
else:
|
|
return "该群不存在"
|
|
|
|
if group:
|
|
state = group["group_status"]
|
|
if state == 0: # 状态为0 表示需要同意
|
|
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_private)
|
|
logger.info(f"重新加群,账号信息:{group['api_id']}")
|
|
update_sql(res, telegram_link, group, client_mysql)
|
|
return res
|
|
return {"title": group["group_title"], "group_id": group["group_id"]}
|
|
else:
|
|
sql_robot = f"select * from {TG_ROBOT_ACCOUNT_TABLE} order by total_group, update_time"
|
|
count, group = client_mysql.getOne(sql_robot)
|
|
logger.info(f"获取账号:{group['api_id']}")
|
|
res = await main(group["session_string"], group["api_id"], group["api_hash"], channel, is_private)
|
|
update_sql(res, telegram_link, group, client_mysql)
|
|
return res
|
|
|
|
|
|
async def _replay(telegram_link, channel, message_list):
|
|
"""
|
|
回复消息 以及 识别权限
|
|
:param telegram_link:
|
|
:param channel:
|
|
:param message_list:
|
|
:return:
|
|
"""
|
|
# 先查询这个群是否加群成功
|
|
sql_group = f"select a.*, b.api_id, b.session_string, b.total_group from {TG_ROBOT_ACCOUNT_GROUP_TABLE} a, {TG_ROBOT_ACCOUNT_TABLE} b where group_link='%s' and a.api_hash=b.api_hash" % telegram_link
|
|
group = get_data_from_sql(sql_group)
|
|
result = {
|
|
"send_message_status": "", # 发送成功 视为成功
|
|
"allow_sending": None
|
|
}
|
|
# # TODO 测试账号
|
|
# group = {
|
|
# "group_status": 1,
|
|
# "api_id": "22955009",
|
|
# "api_hash": "c5581d1ed5880f6e0b1734dad1d4a10d",
|
|
# "session_string": "1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA="
|
|
# }
|
|
|
|
if group and group["group_status"]: # 群组存在 且加群成功
|
|
client = await GroupFunc.login(group["session_string"], group["api_id"], group["api_hash"])
|
|
group_entity = await client.get_entity(channel) # 获取群组类型
|
|
is_banned = await GroupFunc.is_banned(client, group_entity) # 查看是否被禁
|
|
if not is_banned:
|
|
allow_status = await GroupFunc.get_group_rights(client, group_entity)
|
|
|
|
result["allow_sending"] = allow_status if allow_status else None
|
|
if allow_status: # 成功则返回消息对象 不成功则返回
|
|
mess_status = await send_messages(client, group_entity, message_list, allow_status)
|
|
logger.info(mess_status)
|
|
result["send_message_status"] = "发送成功" if isinstance(mess_status, Message) else mess_status
|
|
else:
|
|
result["send_message_status"] = "当前群组不支持发送消息"
|
|
else:
|
|
result["send_message_status"] = "群组管理员禁止发送消息"
|
|
|
|
await client.disconnect()
|
|
|
|
else:
|
|
result["send_message_status"] = "群组并未添加,检查群组状态"
|
|
|
|
return result
|
|
|
|
|
|
async def _detail(telegram_link, file="profile"):
|
|
"""
|
|
获取某个群组的状态
|
|
:param telegram_link:
|
|
:return:
|
|
"""
|
|
sql_group = f"select api_id, session_string, api_hash from {TG_ROBOT_ACCOUNT_TABLE}"
|
|
group_list = get_data_from_sql1(sql_group)
|
|
group = random.choice(group_list) # 随机选择一个账号
|
|
|
|
# 测试账号
|
|
# group = {
|
|
# "group_status": 1,
|
|
# "api_id": "22955009",
|
|
# "api_hash": "c5581d1ed5880f6e0b1734dad1d4a10d",
|
|
# "session_string": "1BVtsOIEBuwG1-0k5xGzlopl4G7ghAhBPorz1HcaSkfcuDEsYKSJKQ0nCLYbMTT7yplnfJHEYXR-rGY5FoEyrAYsW86obngGwxLDpl9b9IuGhxCDlFSo_O2AIPw3Duf8tc5DewfNGqZ7U8CbpEjFcpEaRRy23Z93DVZtcYHBLp6vLh5iLndKXanW4vxArJODjVklAKwxqDD5LGixvoeP5p9W1VJAeihJxqEl0UHL12dF4T7MYcdhW-ylA4NvCtgeIaqiVwZ1VuVyiyLNYaMrpZZfdmfOGkYapp-1ubYE8XKAQ8jau3XnWCLvk50w6L9DaWp8PSdQ7RRZf5G2swSyurHCK6quAVfA="
|
|
# }
|
|
|
|
client = await GroupFunc.login(group["session_string"], group["api_id"], group["api_hash"])
|
|
group_detail = await GroupFunc.get_group(client, telegram_link, file=file) # 会自动上传头像
|
|
await client.disconnect() # 断开链接
|
|
logger.info(f"当前使用账号: {group['api_id']} => {telegram_link} => {group_detail}")
|
|
return group_detail
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 运行主循环
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
loop.run_until_complete(_detail("rtnoticias"))
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|