You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
266 lines
7.7 KiB
266 lines
7.7 KiB
import time
|
|
import json
|
|
|
|
from dianping.mtgsig import mtgsig_run
|
|
from douyin.dy_abogus import run
|
|
from hnw.hnw_hook import build_headers
|
|
from flask import Flask, request
|
|
import traceback
|
|
from utils.Logger import MyLogger
|
|
from wx.video.video_decode import main
|
|
from xhs.shield import shield_run
|
|
from xhs.shield_aes import get_key
|
|
from xhs.xhs_captcha import SignalCaptcha
|
|
from xhs.xhs_param import get_xs, get_a1, get_comment
|
|
|
|
app = Flask(__name__)
|
|
log = MyLogger()
|
|
|
|
|
|
"""
|
|
部署
|
|
nohup /root/anaconda3/envs/py3.8/bin/python server.py > /dev/null 2>&1 &
|
|
|
|
新部署
|
|
启动:uwsgi -d --ini uwsgi.ini
|
|
停止:uwsgi --stop
|
|
重启:uwsgi --reload
|
|
"""
|
|
|
|
|
|
@app.route("/getData", methods=['GET'])
|
|
def get_review_list():
|
|
# shop_id = request.query.shopId
|
|
# page_num = request.query.pageNum
|
|
# page_num = int(page_num) * 14
|
|
# review_list = main(shop_id, str(page_num))
|
|
# ret = {"desc": "success", "result": review_list}
|
|
ret = ""
|
|
return json.dumps(ret, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/hnw/get_header", methods=['GET'])
|
|
def get_hnw_headers():
|
|
ret = {"code": 200, "data": {}}
|
|
try:
|
|
headers = build_headers()
|
|
ret["data"] = headers
|
|
except Exception as e:
|
|
ret["code"] = 500
|
|
ret["data"] = f"{e}"
|
|
log.info(f"[hnw]: {ret}")
|
|
return json.dumps(ret, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/xhs/get_header", methods=['POST'])
|
|
def get_xhs_headers():
|
|
"""
|
|
小红书x-s参数接口
|
|
参数:请求参数/a1/timeStamp
|
|
对于cookie信息如果不传则本地生成一个
|
|
:return:
|
|
"""
|
|
ret = {"code": 200, "data": {}}
|
|
try:
|
|
api_type = request.args.get("api")
|
|
a1 = request.args.get("a1") if request.args.get("a1") else get_a1("Mac OS")[0]
|
|
times = request.args.get("timStamp") if request.args.get("timStamp") else str(int(time.time() * 1000))
|
|
data = request.get_data()
|
|
params = json.dumps(json.loads(data), ensure_ascii=False, separators=(',', ':'))
|
|
res = get_xs(a1, api_type, params, times)
|
|
ret["data"] = res
|
|
except Exception as e:
|
|
log.error(e)
|
|
ret = {"code": 500, "data": str(e)}
|
|
log.info(f"[xhs]: {ret}")
|
|
|
|
return json.dumps(ret, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/xhs/get_header_all", methods=['POST'])
|
|
def get_xhs_headers_all():
|
|
"""
|
|
小红书x-s、x-comment参数接口
|
|
参数:请求参数/a1/timeStamp
|
|
:return:
|
|
"""
|
|
ret = {"code": 200, "data": {}}
|
|
try:
|
|
api_type = request.args.get("api")
|
|
a1 = request.args.get("a1")
|
|
times = str(int(time.time() * 1000))
|
|
data = request.get_data()
|
|
params = json.dumps(json.loads(data), ensure_ascii=False, separators=(',', ':'))
|
|
res = get_xs(a1, api_type, params, times)
|
|
res["x-s-common"] = get_comment(a1, res['x-s'], res['x-t'])
|
|
ret["data"] = res
|
|
except Exception as e:
|
|
log.error(e)
|
|
ret = {"code": 500, "data": str(e)}
|
|
log.info(f"[xhs]: {ret}")
|
|
|
|
return json.dumps(ret, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/xhs/pass_captcha", methods=['POST'])
|
|
def verify_captcha():
|
|
"""
|
|
xhs验证码
|
|
461触发
|
|
:return:
|
|
"""
|
|
ret = {"code": 200, "data": {}}
|
|
try:
|
|
data = request.get_data()
|
|
data = json.loads(data)
|
|
log.info(data)
|
|
web_session = data["web_session"]
|
|
verify_uuid = data["verify_uuid"]
|
|
source = data["source"]
|
|
a1 = data["a1"]
|
|
webId = data["webId"]
|
|
is_proxy = data.get("is_proxy") if data.get("is_proxy") else False
|
|
A = SignalCaptcha(web_session, verify_uuid, a1, webId, source=source, is_proxy=is_proxy)
|
|
res = A.run()
|
|
ret["data"] = res
|
|
except Exception as e:
|
|
log.error(e)
|
|
ret = {"code": 500, "data": str(e)}
|
|
|
|
log.info(f"[xhs]: {ret}")
|
|
|
|
return json.dumps(ret, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/xhs/get_cookie", methods=['GET'])
|
|
def get_xhs_cookies():
|
|
"""
|
|
参数:platform 针对平台产生cookie 可能会影响风控
|
|
种类:Android/iOS/Mac OS/Linux/Windows(暂时没发现用)
|
|
:return:
|
|
"""
|
|
ret = {"code": 200, "data": {}}
|
|
try:
|
|
platform = request.args.get("platform") if request.args.get("platform") else "Mac OS"
|
|
a1, web_id = get_a1(platform)
|
|
ret["data"] = {
|
|
"a1": a1,
|
|
"webId": web_id
|
|
}
|
|
except Exception as e:
|
|
ret["code"] = 500
|
|
ret["data"] = f"{e}"
|
|
|
|
log.info(f"[xhs]: {ret}")
|
|
return json.dumps(ret, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/wx/decode", methods=["POST"])
|
|
async def get_upload_path():
|
|
"""
|
|
读取视频流
|
|
:return:
|
|
"""
|
|
if 'file' not in request.files:
|
|
return json.dumps({"code": 500, "msg": "视频流文件不存在"}, ensure_ascii=False)
|
|
file = request.files["file"]
|
|
decode = request.args.get("decodekey")
|
|
log.info(f"[wx]: 获取的decode {decode}")
|
|
upload_path = await main(file, decode)
|
|
log.info(f"[wx]: {upload_path}")
|
|
return json.dumps(upload_path, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/wx/decode", methods=["GET"])
|
|
async def get_upload_path1():
|
|
"""
|
|
读取本地文件
|
|
:return:
|
|
"""
|
|
decode = request.args.get("decodekey")
|
|
localpath = request.args.get("localpath")
|
|
try:
|
|
files = {'file': open(localpath, 'rb')}
|
|
except Exception as e:
|
|
log.error(f"[wx] error {e}")
|
|
return json.dumps({"code":500, "msg": "视频读取失败"}, ensure_ascii=False)
|
|
|
|
log.info(f"[wx]: 获取的decode {decode}")
|
|
upload_path = await main(files["file"], decode)
|
|
log.info(f"[wx]: {upload_path}")
|
|
return json.dumps(upload_path, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/dy/get_abogus", methods=["POST"])
|
|
async def get_abogus():
|
|
"""
|
|
读取本地文件
|
|
:return:
|
|
"""
|
|
|
|
try:
|
|
data = json.loads(request.get_data())
|
|
ua = data.get("ua")
|
|
param = data.get("param")
|
|
a_bogus = run(param, ua)
|
|
res = {"code": 200, "data": a_bogus}
|
|
log.info(f"[dy]: {len(a_bogus)} => {res}")
|
|
return json.dumps(res, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.error(f"[dy] error {e}")
|
|
return json.dumps({"code":500, "msg": "算法生成失败"}, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/dp/get_mtgsig", methods=["POST"])
|
|
async def get_mtgsig():
|
|
"""
|
|
获取小程序mtgsig1.2
|
|
:return:
|
|
"""
|
|
try:
|
|
data = json.loads(request.get_data())
|
|
url = data.get("url")
|
|
a3 = data.get("a3")
|
|
b8 = data.get("b8") # 循环的次数
|
|
a6 = data.get("a6")
|
|
if not b8 or not a6 or not a3:
|
|
raise Exception("check b8/a6/a3 params!!!")
|
|
mtgsig = mtgsig_run(url, b8, a3, a6)
|
|
res = {"code": 200, "data": mtgsig}
|
|
log.info(f"[mtgsig]: => {res}")
|
|
return json.dumps(res, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.error(f"[mtgsig] error {e}")
|
|
return json.dumps({"code":500, "msg": "算法生成失败"}, ensure_ascii=False)
|
|
|
|
|
|
@app.route("/xhs/get_shield", methods=['POST'])
|
|
async def get_app_xhs_headers():
|
|
"""
|
|
小红书app端
|
|
shield本地生成
|
|
:return:
|
|
"""
|
|
ret = {"code": 200, "data": {}}
|
|
try:
|
|
data = request.get_data()
|
|
data_json = json.loads(data)
|
|
hmac_main = data_json["hmac_main"]
|
|
url = data_json["url"]
|
|
xy_common_params = data_json["xy_common_params"]
|
|
deviceId = data_json["deviceId"]
|
|
api = data_json["api"] # /api/sns/v1/note/feed 默认
|
|
keys = get_key(deviceId, hmac_main) # 获得keys
|
|
res = shield_run(url, keys, xy_common_params, deviceId, api) # 加密流程
|
|
ret["data"] = res
|
|
except Exception as e:
|
|
log.error(e)
|
|
traceback.print_exc()
|
|
ret = {"code": 500, "data": str(e)}
|
|
log.info(f"[xhs]: {ret}")
|
|
|
|
return json.dumps(ret, ensure_ascii=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=8088)
|