|
|
import time import json
from dianping.mtgsig import mtgsig_run from douyin.dy_abogus import run from hnw.hnw_hook import build_headers from flask import Flask, request import traceback from utils.Logger import MyLogger from wx.video.video_decode import main from xhs.shield import shield_run from xhs.shield_aes import get_key from xhs.xhs_captcha import SignalCaptcha from xhs.xhs_param import get_xs, get_a1, get_comment
app = Flask(__name__) log = MyLogger()
"""
部署 nohup /root/anaconda3/envs/py3.8/bin/python server.py > /dev/null 2>&1 &
新部署 启动:uwsgi -d --ini uwsgi.ini 停止:uwsgi --stop 重启:uwsgi --reload """
@app.route("/getData", methods=['GET']) def get_review_list(): # shop_id = request.query.shopId # page_num = request.query.pageNum # page_num = int(page_num) * 14 # review_list = main(shop_id, str(page_num)) # ret = {"desc": "success", "result": review_list} ret = "" return json.dumps(ret, ensure_ascii=False)
@app.route("/hnw/get_header", methods=['GET']) def get_hnw_headers(): ret = {"code": 200, "data": {}} try: headers = build_headers() ret["data"] = headers except Exception as e: ret["code"] = 500 ret["data"] = f"{e}" log.info(f"[hnw]: {ret}") return json.dumps(ret, ensure_ascii=False)
@app.route("/xhs/get_header", methods=['POST']) def get_xhs_headers(): """
小红书x-s参数接口 参数:请求参数/a1/timeStamp 对于cookie信息如果不传则本地生成一个 :return: """
ret = {"code": 200, "data": {}} try: api_type = request.args.get("api") a1 = request.args.get("a1") if request.args.get("a1") else get_a1("Mac OS")[0] times = request.args.get("timStamp") if request.args.get("timStamp") else str(int(time.time() * 1000)) data = request.get_data() params = json.dumps(json.loads(data), ensure_ascii=False, separators=(',', ':')) res = get_xs(a1, api_type, params, times) ret["data"] = res except Exception as e: log.error(e) ret = {"code": 500, "data": str(e)} log.info(f"[xhs]: {ret}")
return json.dumps(ret, ensure_ascii=False)
@app.route("/xhs/get_header_all", methods=['POST']) def get_xhs_headers_all(): """
小红书x-s、x-comment参数接口 参数:请求参数/a1/timeStamp :return: """
ret = {"code": 200, "data": {}} try: api_type = request.args.get("api") a1 = request.args.get("a1") times = str(int(time.time() * 1000)) data = request.get_data() params = json.dumps(json.loads(data), ensure_ascii=False, separators=(',', ':')) res = get_xs(a1, api_type, params, times) res["x-s-common"] = get_comment(a1, res['x-s'], res['x-t']) ret["data"] = res except Exception as e: log.error(e) ret = {"code": 500, "data": str(e)} log.info(f"[xhs]: {ret}")
return json.dumps(ret, ensure_ascii=False)
@app.route("/xhs/pass_captcha", methods=['POST']) def verify_captcha(): """
xhs验证码 461触发 :return: """
ret = {"code": 200, "data": {}} try: data = request.get_data() data = json.loads(data) log.info(data) web_session = data["web_session"] verify_uuid = data["verify_uuid"] source = data["source"] a1 = data["a1"] webId = data["webId"] is_proxy = data.get("is_proxy") if data.get("is_proxy") else False A = SignalCaptcha(web_session, verify_uuid, a1, webId, source=source, is_proxy=is_proxy) res = A.run() ret["data"] = res except Exception as e: log.error(e) ret = {"code": 500, "data": str(e)}
log.info(f"[xhs]: {ret}")
return json.dumps(ret, ensure_ascii=False)
@app.route("/xhs/get_cookie", methods=['GET']) def get_xhs_cookies(): """
参数:platform 针对平台产生cookie 可能会影响风控 种类:Android/iOS/Mac OS/Linux/Windows(暂时没发现用) :return: """
ret = {"code": 200, "data": {}} try: platform = request.args.get("platform") if request.args.get("platform") else "Mac OS" a1, web_id = get_a1(platform) ret["data"] = { "a1": a1, "webId": web_id } except Exception as e: ret["code"] = 500 ret["data"] = f"{e}"
log.info(f"[xhs]: {ret}") return json.dumps(ret, ensure_ascii=False)
@app.route("/wx/decode", methods=["POST"]) async def get_upload_path(): """
读取视频流 :return: """
if 'file' not in request.files: return json.dumps({"code": 500, "msg": "视频流文件不存在"}, ensure_ascii=False) file = request.files["file"] decode = request.args.get("decodekey") log.info(f"[wx]: 获取的decode {decode}") upload_path = await main(file, decode) log.info(f"[wx]: {upload_path}") return json.dumps(upload_path, ensure_ascii=False)
@app.route("/wx/decode", methods=["GET"]) async def get_upload_path1(): """
读取本地文件 :return: """
decode = request.args.get("decodekey") localpath = request.args.get("localpath") try: files = {'file': open(localpath, 'rb')} except Exception as e: log.error(f"[wx] error {e}") return json.dumps({"code":500, "msg": "视频读取失败"}, ensure_ascii=False)
log.info(f"[wx]: 获取的decode {decode}") upload_path = await main(files["file"], decode) log.info(f"[wx]: {upload_path}") return json.dumps(upload_path, ensure_ascii=False)
@app.route("/dy/get_abogus", methods=["POST"]) async def get_abogus(): """
读取本地文件 :return: """
try: data = json.loads(request.get_data()) ua = data.get("ua") param = data.get("param") a_bogus = run(param, ua) res = {"code": 200, "data": a_bogus} log.info(f"[dy]: {len(a_bogus)} => {res}") return json.dumps(res, ensure_ascii=False) except Exception as e: log.error(f"[dy] error {e}") return json.dumps({"code":500, "msg": "算法生成失败"}, ensure_ascii=False)
@app.route("/dp/get_mtgsig", methods=["POST"]) async def get_mtgsig(): """
获取小程序mtgsig1.2 :return: """
try: data = json.loads(request.get_data()) url = data.get("url") a3 = data.get("a3") b8 = data.get("b8") # 循环的次数 a6 = data.get("a6") if not b8 or not a6 or not a3: raise Exception("check b8/a6/a3 params!!!") mtgsig = mtgsig_run(url, b8, a3, a6) res = {"code": 200, "data": mtgsig} log.info(f"[mtgsig]: => {res}") return json.dumps(res, ensure_ascii=False) except Exception as e: log.error(f"[mtgsig] error {e}") return json.dumps({"code":500, "msg": "算法生成失败"}, ensure_ascii=False)
@app.route("/xhs/get_shield", methods=['POST']) async def get_app_xhs_headers(): """
小红书app端 shield本地生成 :return: """
ret = {"code": 200, "data": {}} try: data = request.get_data() data_json = json.loads(data) hmac_main = data_json["hmac_main"] url = data_json["url"] xy_common_params = data_json["xy_common_params"] deviceId = data_json["deviceId"] api = data_json["api"] # /api/sns/v1/note/feed 默认 keys = get_key(deviceId, hmac_main) # 获得keys res = shield_run(url, keys, xy_common_params, deviceId, api) # 加密流程 ret["data"] = res except Exception as e: log.error(e) traceback.print_exc() ret = {"code": 500, "data": str(e)} log.info(f"[xhs]: {ret}")
return json.dumps(ret, ensure_ascii=False)
if __name__ == '__main__': app.run(host='0.0.0.0', port=8088)
|