算法暴露接口(xhs、dy、ks、wx、hnw)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

336 lines
16 KiB

# coding:utf-8
import json
import time
import os, sys
import argparse
# 相对路径补充
root_path = os.path.abspath(os.path.dirname(__file__)).split('api-py')[0] + "api-py"
sys.path.append(root_path)
from concurrent.futures._base import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from urllib.parse import urlparse, quote_plus, parse_qs
from utils.MysqlData import MysqlPoolClient, CRAWLER_DB_CONF_KS
from kuaishou.ks_http import download_q, BaseHeaders, BaseParam, download_pic, BaseURL
from utils.Logger import MyLogger
from loguru import logger
from kuaishou.ks_make_trace import Generate_trajectory
from utils.ImageHelper import recognize_gap
import traceback
class KSSlide(object):
"""
快手滑块
"""
def __init__(self, count=5, is_proxy=False):
self.is_proxy = is_proxy
self.count = count
self.did_url = "https://www.kuaishou.com/short-video/3xdhjtb9xs7xpaw?authorId=3xsdu49r65skedk&streamSource=profile&area=profilexxnull"
self.api_url = "https://www.kuaishou.com/graphql"
self.headers = BaseHeaders.HEADERS.value
self.doc_headers = BaseHeaders.DOC_HEADERS.value
self.pic_headers = BaseHeaders.PIC_HEADERS.value
self.verify_headers = BaseHeaders.VERIFY_HEADERS.value
self.did = ""
self.captchaSession = ""
self.captcha_refer = ""
self.sql_list = []
self.post_data = {
"search": {
"referer": "",
"data": {
"operationName": "visionSearchPhoto",
"variables": {
"keyword": "f1",
"pcursor": "",
"page": "search"
},
"query": "fragment photoContent on PhotoEntity {\n __typename\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n commentCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n}\n\nfragment recoPhotoFragment on recoPhotoEntity {\n __typename\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n commentCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n}\n\nfragment feedContent on Feed {\n type\n author {\n id\n name\n headerUrl\n following\n headerUrls {\n url\n __typename\n }\n __typename\n }\n photo {\n ...photoContent\n ...recoPhotoFragment\n __typename\n }\n canAddComment\n llsid\n status\n currentPcursor\n tags {\n type\n name\n __typename\n }\n __typename\n}\n\nquery visionSearchPhoto($keyword: String, $pcursor: String, $searchSessionId: String, $page: String, $webPageArea: String) {\n visionSearchPhoto(keyword: $keyword, pcursor: $pcursor, searchSessionId: $searchSessionId, page: $page, webPageArea: $webPageArea) {\n result\n llsid\n webPageArea\n feeds {\n ...feedContent\n __typename\n }\n searchSessionId\n pcursor\n aladdinBanner {\n imgUrl\n link\n __typename\n }\n __typename\n }\n}\n"
},
},
"video": {},
"user": {
"referer": "https://www.kuaishou.com/profile/3xsdu49r65skedk",
"data": {
"operationName": "visionProfilePhotoList",
"variables": {
"userId": "3xsdu49r65skedk",
"pcursor": "",
"page": "profile"
},
"query": "fragment photoContent on PhotoEntity {\n __typename\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n commentCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n}\n\nfragment recoPhotoFragment on recoPhotoEntity {\n __typename\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n commentCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n}\n\nfragment feedContent on Feed {\n type\n author {\n id\n name\n headerUrl\n following\n headerUrls {\n url\n __typename\n }\n __typename\n }\n photo {\n ...photoContent\n ...recoPhotoFragment\n __typename\n }\n canAddComment\n llsid\n status\n currentPcursor\n tags {\n type\n name\n __typename\n }\n __typename\n}\n\nquery visionProfilePhotoList($pcursor: String, $userId: String, $page: String, $webPageArea: String) {\n visionProfilePhotoList(pcursor: $pcursor, userId: $userId, page: $page, webPageArea: $webPageArea) {\n result\n llsid\n webPageArea\n feeds {\n ...feedContent\n __typename\n }\n hostName\n pcursor\n __typename\n }\n}\n"
}
},
"comment": {
"referer": "https://www.kuaishou.com/short-video/3xdhjtb9xs7xpaw?authorId=3xsdu49r65skedk&streamSource=profile&area=profilexxnull",
"data": {
"operationName": "commentListQuery",
"variables": {
"photoId": "3xdhjtb9xs7xpaw",
"pcursor": ""
},
"query": "query commentListQuery($photoId: String, $pcursor: String) {\n visionCommentList(photoId: $photoId, pcursor: $pcursor) {\n commentCount\n pcursor\n rootComments {\n commentId\n authorId\n authorName\n content\n headurl\n timestamp\n likedCount\n realLikedCount\n liked\n status\n authorLiked\n subCommentCount\n subCommentsPcursor\n subComments {\n commentId\n authorId\n authorName\n content\n headurl\n timestamp\n likedCount\n realLikedCount\n liked\n status\n authorLiked\n replyToUserName\n replyTo\n __typename\n }\n __typename\n }\n __typename\n }\n}\n"
}
}
}
pass
def get_did(self):
"""
通过单个视频页获得 did
"""
response = download_q(self.did_url, self.doc_headers, {}, is_proxy=self.is_proxy)
self.did = response.cookies["did"]
logger.info(f"获得did -> {self.did}")
return self.did
def get_session(self, did, token="", type_="user"):
"""
默认从comment接口拿session
"""
if not did:
if not self.did:
return
self.did = did
headers = self.headers.copy()
headers["Referer"] = self.post_data[type_]["referer"]
if token:
headers["Identity-Verification-Token"] = token
headers["Identity-verification-type"] = "2"
data = json.dumps(self.post_data[type_]["data"], separators=(',', ':'))
cookies = {
"kpf": "PC_WEB",
"kpn": "KUAISHOU_VISION",
"clientid": "3",
"did": self.did
}
response = download_q(self.api_url, headers, cookies, data=data, is_proxy=self.is_proxy)
res = response.json()
status = res["data"].get("captcha")
if status: # 需要验证码 非 400002 暂时抛弃40002状态码
captchaSession_url = res["data"]["captcha"]["url"]
params = parse_qs(urlparse(captchaSession_url).query)
elif res["data"].get("result"):
logger.warning("验证码异常 !400002 由于评论不出验证码 暂时放行! ")
captchaSession_url = res["data"]["url"]
params = parse_qs(urlparse(captchaSession_url).query)
else: # 有数据或者为空 不是验证码问题
s = res.get("data", {}).get("visionProfilePhotoList", {}).get("result")
if s and s == 1: # 这里只对user生效
logger.info(f"did 有效; 携带 token {token}")
else:
logger.warning(f"did 无效 {res}; 携带 token {token}")
return None, None
self.captchaSession = params["captchaSession"][0]
self.captcha_refer = captchaSession_url
# logger.info(f"获得captchaSession 》》 {self.captchaSession}")
return self.captcha_refer, self.captchaSession
def get_config(self, did, Referer, captchaSession, is_save=False):
"""
获取验证码信息
"""
headers = self.verify_headers.copy()
headers["Referer"] = Referer
timeStamp = str(int(time.time()))
cookies = {
"did": did
}
url = "https://captcha.zt.kuaishou.com/rest/zt/captcha/sliding/config"
data = {
"captchaSession": captchaSession
}
response = download_q(url, headers, cookies, data=data, is_proxy=self.is_proxy)
data = response.json()
# logger.info(f"获得config -> {data}")
captchaSn = data.get("captchaSn")
bgPicUrl = data.get("bgPicUrl") + f"?captchaSn={captchaSn}"
cutPicUrl = data.get("cutPicUrl") + f"?captchaSn={captchaSn}"
self.pic_headers["Referer"] = Referer
bgContent = download_q(bgPicUrl, self.pic_headers, cookies).content
fgcontent = download_q(cutPicUrl, self.pic_headers, cookies).content
if is_save:
bg_path = f"tmp/{timeStamp}_bg_pic.jpg"
fg_path = f"tmp/{timeStamp}_fg_pic.jpg"
download_pic(bg_path, bgContent)
download_pic(fg_path, fgcontent)
else:
bg_path = bgContent
fg_path = fgcontent
disY = data.get("disY")
verifyParam = self.build_param(bg_path, fg_path, captchaSn, disY)
return verifyParam
def build_param(self, bg, fg, captchaSn, y):
"""
构造验证参数 模拟轨迹
"""
distance = recognize_gap(bg, fg)
relativeX = int(distance * 0.46) # 缩放
trajectory = Generate_trajectory().get_slide_track(int(distance * 1.76)) # 1.76 1.764 暂时测定稳定
logger.info(f"缩放距离为 -> {relativeX}")
param = BaseParam.VERIFY_PARAM.value
param["captchaSn"] = captchaSn
param["relativeX"] = relativeX
param["relativeY"] = int(y * 0.46) # config接口里的y 缩放 136 * 56/122 ()
param["trajectory"] = trajectory
# param["gpuInfo"] = "" ## TODO: 需要随机替换
# param["captchaExtraParam"] = ""
def get_plaintext(t: dict):
concat_order = ["captchaSn", "bgDisWidth", "bgDisHeight", "cutDisWidth", "cutDisHeight",
"relativeX", "relativeY", "trajectory", "gpuInfo", "captchaExtraParam"]
return "&".join([k + "=" + quote_plus(str(t[k])).replace("+", "%20") for k in concat_order])
info = get_plaintext(param)
verifyParam = KSSlide.encrypt(info)
return verifyParam
@staticmethod
def encrypt(info):
"""
调用node-js
"""
url = BaseURL.NodeURL.value
data = {
"info": info
}
response = download_q(url, {}, {}, data=data)
# logger.info(f"node服务获取加密数据")
return response.text
def verify(self, verifyParam, did, refer):
headers = self.verify_headers.copy()
headers["Referer"] = refer
headers["Content-Type"] = "application/json"
cookies = {
"did": did
}
url = "https://captcha.zt.kuaishou.com/rest/zt/captcha/sliding/kSecretApiVerify"
data1 = {
"verifyParam": verifyParam
}
data = json.dumps(data1, separators=(',', ':'))
response = download_q(url, headers, cookies, data=data, is_proxy=self.is_proxy)
logger.info(f"verify 结果 -> {response.json()}")
res = response.json()
captcha_token = res.get("captchaToken", None)
# logger.info(f"获得captchaToken为: {captcha_token}")
return captcha_token
def run(self):
"""
调用入口
"""
try:
did = self.get_did()
referer, session = self.get_session(did)
if session:
verifyParam = self.get_config(did, referer, session)
if verifyParam:
token = self.verify(verifyParam, did, referer)
if token:
logger.success(f"验证码成功: -> {did}")
sql_demo = "INSERT INTO `ksCookie`(`cookie`, `token`) VALUES('%s', '%s');" % (did, token)
self.sql_list.append(sql_demo)
# logger.info(f"sql_demo {sql_demo}")
self.get_session(did=did, token=token, type_="user")
else:
logger.error("verify 失败")
else:
logger.error("node 生成 参数失败")
else:
logger.error("获得 session 失败")
except Exception as e:
logger.error(e)
traceback.print_exc()
# def write_file(l):
# with open("webdid.txt", "w") as f:
# f.write("\n".join(l))
# f.close()
# logger.info("文件保存成功")
def insert_data(sql_list):
"""
入库
:param sql_list:
:return:
"""
client = MysqlPoolClient(CRAWLER_DB_CONF_KS)
for sql in sql_list:
try:
logger.success(f"insert cookie -> {sql}")
client.getOne(sql)
except Exception as e:
logger.error(f"insert cookie -> {sql}")
def create_by_thread(slid, count):
"""
并发执行
:param slid:
:param count:
:return:
"""
with ThreadPoolExecutor(max_workers=2) as t:
obj_list = []
for i in range(count * 2):
obj = t.submit(slid.run)
obj_list.append(obj)
insert_data(slide.sql_list)
logger.info(f"[sum] 并发任务 需要生成数量 {count}, 实际抓取数量 {count*2}, 实际生成数量 {len(slide.sql_list)}, 成功率 {len(slid.sql_list)/(count*2)}")
def create_by_for(slid, count):
"""
for循环执行
:param slid:
:param count:
:return:
"""
num = 100
i = 0
while num > i:
if len(slid.sql_list) >= count: # 超出目标数量结束
break
slid.run()
i += 1
insert_data(slide.sql_list)
logger.info(f"[sum] 循环任务 需要生成数量 {count}, 实际抓取数量 {i}, 实际生成数量 {len(slide.sql_list)}, 成功率 {len(slide.sql_list)/i}")
if __name__ == '__main__':
slide = KSSlide(is_proxy=True)
log = MyLogger()
parser = argparse.ArgumentParser(description='pass kuaishou cookie slider')
parser.add_argument('-c', type=int, default=10, help="needed cookie count;default count=10;")
parser.add_argument('-m', type=str, default="0", help="method: {0:for, 1:thread}; default method=0;")
args = parser.parse_args()
method = {
"0": create_by_for,
"1": create_by_thread
}
args_count = args.c
args_method = args.m
method[args_method](slide, args_count) # 执行函数