算法暴露接口(xhs、dy、ks、wx、hnw)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

142 lines
4.1 KiB

import json
import random
import aiohttp
import requests
from aiohttp import FormData
from loguru import logger
import functools, time
def retry(exceptions: (BaseException, tuple, list)=BaseException,
max_retries: int = 2,
delay: int = 1,
sleep=time.sleep,
if_result=None):
"""
重试
:param exceptions: 可自定义异常
:param max_retries: 重试次数
:param delay: 休眠时间
:param sleep: time.sleep
:param if_result: 是否对结果进行约束 重试
:return:
"""
if not isinstance(exceptions, (tuple, list)):
new_exceptions = [exceptions]
else:
new_exceptions = exceptions
def init_retry(func, count, *args, **kwargs):
if count > max_retries:
return
try:
if if_result:
# print(f"重拾次数 is {count}")
return call(func, if_result, *args, **kwargs)
return func(*args, **kwargs)
except tuple(new_exceptions) as e:
# print("重试")
# traceback.print_exc()
if count < max_retries:
sleep(delay)
return init_retry(func, count+1, *args, **kwargs)
else:
raise e
def call(func, if_result, *args, **kwargs):
val = func(*args, **kwargs)
if if_result(*val):
return val
raise Exception("The result is not as expected!")
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return init_retry(func, 0, *args, **kwargs)
return wrapped
return decorator
def cost_time(func):
"""
计算耗时
:param func:
:return:
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
logger.info(f"{func.__name__} 请求耗时:{time.time() - start_time}")
return result
return wrapper
def retry_if_result_none(status, content):
if status == 200:
return True
return False
@retry(max_retries=3, delay=1, if_result=retry_if_result_none)
def download_q(url, headers, cookies, data=None, is_proxy=False, timeout=10):
"""
下载器
:param url:
:param headers:
:param cookies:
:param data:
:param is_proxy:
:param timeout:
:return:
"""
proxy = f"" # TOdO: 修改代理
proxies = {'http': proxy, 'https': proxy} # 代理初始化
try:
if data:
if is_proxy:
response = requests.post(url, headers=headers, data=data, proxies=proxies, cookies=cookies, timeout=timeout)
else:
response = requests.post(url, headers=headers, data=data, cookies=cookies, timeout=timeout)
else:
if is_proxy:
response = requests.get(url, headers=headers, proxies=proxies, cookies=cookies, timeout=timeout)
else:
response = requests.get(url, headers=headers, cookies=cookies, timeout=timeout)
status = response.status_code
except Exception as e:
response = None
status = 0
logger.error(f"download {e}")
# print(response)
return status, response
async def upload_file(file_path, content, url=""): # TODO:go-fast
"""
异步上传
:param file_path:
:param url:
:return:
"""
if file_path:
# timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession() as session:
data = FormData()
data.add_field("file", content, filename=file_path, content_type='multipart/form-data;charset=utf-8"')
data.add_field("output", "json")
async with session.post(url, data=data) as response:
result = await response.text() # 返回结果为json字符串
result = json.loads(result)
logger.info(f"upload file {result}")
if 'url' in result.keys():
video_path = result['url']
return video_path
else:
raise Exception
else:
raise Exception