from telethon import TelegramClient, functions from telethon.sessions import StringSession from config import logger, PROXY, TG_ROBOT_ACCOUNT_TABLE from tg_utils.tg_api import download_profile_photo from utils.upload_files import upload_file class GroupFunc(): """ 部分群组功能 TODO: 待添加 """ @staticmethod async def login(session_string, api_id, api_hash): """ 通过 session_string 登陆 :param session_string: :param api_id: :param api_hash: :return: """ session = StringSession(session_string) # 通过session字符串来登陆 也可通过本地文件登陆(不推荐) client = TelegramClient(session, api_id, api_hash, timeout=60, proxy=PROXY) await client.start() return client @staticmethod async def get_login(api_id, api_hash, client_mysql): """ 根据现有的session重新登录获得 string_session :param api_id: :param api_hash: :param client_mysql: :return: """ # 创建 Telethon 客户端对象并进行身份验证 client = TelegramClient(api_hash, api_id, api_hash, timeout=60, proxy=PROXY) await client.start() string_session = StringSession.save(client.session) logger.info(f"sss: {string_session}") if string_session: # 更新 session sql = f"update {TG_ROBOT_ACCOUNT_TABLE} set session_string='{string_session}' where api_hash='{api_hash}'" client_mysql.getOne(sql) await client.disconnect() return string_session @staticmethod async def get_related_group(client, group_name): """ 获取相关群的 info信息和讨论组 :param client: :param group_name: :return: """ detail = { "description": "", "discussion": {} } full = await client(functions.channels.GetFullChannelRequest(channel=group_name)) full_channel = full.full_chat detail["description"] = full_channel.about discussion_id = full_channel.linked_chat_id if discussion_id: detail["discussion"] = { "group_id": -1000000000000 - discussion_id } return detail @staticmethod async def get_group(client, channel, file="profile"): """ 获取群组信息 :param client: :param channel: :param file: 规定下载位置 :return: """ group = { "group_id": "", "title": "", "photo": "", "description": "", "participants_count": "", } full = await client(functions.channels.GetFullChannelRequest(channel=channel)) full_channel = full.full_chat group["description"] = full_channel.about group["group_id"] = -1000000000000 - full_channel.id group["title"] = full.chats[0].title group["participants_count"] = full_channel.participants_count path = await download_profile_photo(client, full_channel, file=file) group["photo"] = await upload_file(path) # logger.info(group) return group @staticmethod async def get_group_rights(client, channel): """ 群组权限识别 :param client: :param channel: :return: """ _allow_status = { "send_photos": False, "send_videos": False, "send_plain": False, "send_docs": False, } if isinstance(channel, str): group_entity = await client.get_entity(channel) # 获取群组类型 rights = group_entity.default_banned_rights else: rights = channel.default_banned_rights # 群组默认的权限对象 if rights: # 目前发现只有群组存在这个值 logger.info(f"群组权限: {channel.title} => {rights}") if not rights.send_messages: _allow_status = { "send_photos": not rights.send_photos, # 发送图片 "send_videos": not rights.send_videos, # 发送视频 "send_plain": not rights.send_plain, # 发送文本 "send_docs": not rights.send_docs, # 发送文件 } else: # 群组类型不符合要求 _allow_status = False return _allow_status @staticmethod async def is_banned(client, channel): """ 群组是否被禁 :param client: :param channel: :return: """ permissions = await client.get_permissions(channel, 'me') logger.info(f"是否存在权限 ! => {permissions.is_banned}") return permissions.is_banned