|
|
import time
from telethon.tl.types import MessageEntityTextUrl, ChannelFull
from utils.upload_files import download_file
async def download_media(message, file="resources"): """
下载上传 媒体文件 :param message: :param file: :return: """
if file: filename = await message.download_media(file=file) # 指定下载目录 组图会分为多个message传递 return filename
async def download_profile_photo(client, sender, file="profile"): """
下载用户头像 :param client: :param sender: :param file: :return: """
if isinstance(sender, ChannelFull): photo = sender.chat_photo else: # UserProfilePhoto 对象 默认头像这个字段为None photo = sender.photo if photo: path = await client.download_profile_photo(sender, file=file) # 下载头像 return path
async def get_messages(client, chat, message_id): """
获得某个群组指定id的聊天信息 :param client: :param chat: :param message_id: :return: """
message = await client.get_messages(chat, ids=message_id) print("message_1337!,", message) return message
async def get_extra_linked(message): """
获取聊天中的超链接 :param message: :return: """
entities = message.entities message_text = message.message other_link = [] if entities: text_urls = [entry for entry in entities if isinstance(entry, MessageEntityTextUrl)] for _ in text_urls: url = _.url text = message_text[_.offset: _.offset+_.length] # 内容可能由于某些字符导致偏移 造成只有原始文本的一部分 # logger.info(f"捕获到超链接:{text} => {url}") other_link.append(dict(url=url, offset=_.offset, length=_.length))
return other_link
async def send_messages(client, user, messages, allow_status, file="resources"): """
发送消息 :param client: :param user: :param messages: { "message": "", # 文本 "media_type": "", "media_link": "" } :param allow_status: :param file: 默认下载的路径 :return: """
result = "" map_dict = { "photos": ".jpg", "videos": ".mp4", "docs": "", } if messages: content = messages.get("message") media_type = f"send_{messages.get('media_type')}" media_link = messages.get("media_link") flag = 0 file_path = "" if content and ("send_plain" in allow_status.keys()) and allow_status["send_plain"]: # 判断是否允许文本发送 flag += 1
if (media_type in allow_status.keys()) and allow_status[media_type]: # 判断是否允许媒体发送 time_stamp = str(int(time.time())) file_type = map_dict[messages.get('media_type')] if map_dict[messages.get('media_type')] else \ media_link.split("/")[-1] file_name = f"{file}/{time_stamp}{file_type}"
file_path = await download_file(media_link, file_name) # 先下载 flag += 2
if flag == 0: result = "权限不支持" elif flag == 1: # 只发文本 result = await client.send_message(user, content) elif flag == 2: # 只发图片 result = await client.send_file(user, file_path) elif flag == 3: # 图文发送 result = await client.send_message(user, content, file=file_path)
return result else: return result
|