|
|
# -*- coding: utf-8 -*-# 1. 【核心】必须在最开头打猴子补丁,支持异步并发 更新:增加了动态请求,将页面拉到最底部的操作from gevent import monkeymonkey.patch_all()import osimport loggingimport timeimport asynciofrom concurrent.futures import ThreadPoolExecutorfrom urllib.parse import urlparsefrom logging.handlers import RotatingFileHandlerfrom flask import Flask, request, jsonifyimport requestsfrom playwright.async_api import async_playwright
app = Flask(__name__)
# === 配置区域 ===# 1. 静态请求代理 (Privoxy - HTTP)STATIC_PROXY_IP = "127.0.0.1"STATIC_PROXY_PORT = "19095"STATIC_PROXIES = { "http": f"http://{STATIC_PROXY_IP}:{STATIC_PROXY_PORT}", "https": f"http://{STATIC_PROXY_IP}:{STATIC_PROXY_PORT}"}
# 2. 动态请求代理 (Tor - SOCKS5)# Playwright 直接连 Tor 端口,效率更高DYNAMIC_PROXY_SERVER = "socks5://127.0.0.1:9050"# 动态采集专用线程池,在线程内运行 async PlaywrightDYNAMIC_MAX_WORKERS = 6DYNAMIC_EXECUTOR = ThreadPoolExecutor(max_workers=DYNAMIC_MAX_WORKERS, thread_name_prefix="pw-dyn")
DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0"
# 特定站点可在这里做定制化策略SITE_PROFILES = { "pitchprash4aqilfr7sbmuwve3pnkpylqwxjbj2q5o4szcfeea6d27yd.onion": { "force_dynamic": True, "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "User-Agent": DEFAULT_USER_AGENT, }, "static": { "allow_redirects": True, "timeout_s": 90, "retry_times": 3, "retry_interval_ms": 1200 }, "dynamic": { "initial_wait_ms": 8000, "post_scroll_wait_ms": 3000, "wait_selector": "main, article, .post, .search-results, #content, body", "wait_selector_timeout_ms": 30000, "goto_wait_until": "domcontentloaded", "retry_times": 3, "retry_interval_ms": 1500, "min_content_len": 200 } }}
# 规则匹配:用于批量覆盖“同类页面”,如任意 onion 站的搜索页RULE_PROFILES = [ { "name": "xmh57_prefix_omega", "match": { "hostname_prefix": "xmh57jrknzkhv6y3ls3ubitzfqnkrwxhopf5aygthi7d6rplyvk3noyd", "path_contains": ["/cgi-bin/omega/omega"] }, "profile": { "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Upgrade-Insecure-Requests": "1", "User-Agent": DEFAULT_USER_AGENT }, "static": { "allow_redirects": True, "timeout_s": 120, "retry_times": 4, "retry_interval_ms": 1500, "retry_status_codes": [502, 503, 504], "min_content_len": 120 }, "dynamic": { "initial_wait_ms": 8000, "post_scroll_wait_ms": 2500, "wait_selector": "body, main, #content, .results, .record", "wait_selector_timeout_ms": 25000, "goto_wait_until": "domcontentloaded", "retry_times": 3, "retry_interval_ms": 1500, "retry_status_codes": [502, 503, 504], "min_content_len": 120 } } }, { "name": "generic_onion_search", "match": { "hostname_suffix": ".onion", "path_contains": ["/search"] }, "profile": { "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Upgrade-Insecure-Requests": "1", "User-Agent": DEFAULT_USER_AGENT }, "static": { "allow_redirects": True, "timeout_s": 90, "retry_times": 2, "retry_interval_ms": 1000 }, "dynamic": { "initial_wait_ms": 8000, "post_scroll_wait_ms": 3000, "wait_selector": "main, article, .post, .search-results, #content, body", "wait_selector_timeout_ms": 30000, "goto_wait_until": "domcontentloaded", "retry_times": 2, "retry_interval_ms": 1200, "min_content_len": 120 } } }]
# === 日志配置函数 (保持不变) ===def setup_logging(): if not os.path.exists('logs'): os.makedirs('logs')
logger = logging.getLogger('darkweb_spider') logger.setLevel(logging.INFO)
formatter = logging.Formatter( '%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' )
info_handler = RotatingFileHandler('logs/info.log', maxBytes=10 * 1024 * 1024, backupCount=10, encoding='utf-8') info_handler.setLevel(logging.INFO) info_handler.setFormatter(formatter)
error_handler = RotatingFileHandler('logs/error.log', maxBytes=10 * 1024 * 1024, backupCount=10, encoding='utf-8') error_handler.setLevel(logging.ERROR) error_handler.setFormatter(formatter)
logger.addHandler(info_handler) logger.addHandler(error_handler)
return logger
# 初始化日志对象logger = setup_logging()
def _get_profile_by_url(target_url): """
根据域名 + 规则返回站点定制配置 """
try: parsed = urlparse(target_url) hostname = (parsed.hostname or "").lower() path = parsed.path or "/" except Exception: hostname = "" path = "/"
profile = SITE_PROFILES.get(hostname, {}).copy()
# 覆盖规则:命中则叠加到当前 profile 上 for rule in RULE_PROFILES: matcher = rule.get("match", {}) host_suffix = (matcher.get("hostname_suffix") or "").lower() host_prefix = (matcher.get("hostname_prefix") or "").lower() path_contains = matcher.get("path_contains", [])
host_ok = (not host_suffix) or hostname.endswith(host_suffix) if host_prefix: host_ok = host_ok and hostname.startswith(host_prefix) path_ok = True if path_contains: path_ok = any(token in path for token in path_contains)
if host_ok and path_ok: rule_profile = rule.get("profile", {}) profile = _merge_profile(profile, rule_profile)
return profile
def _merge_profile(base_profile, extra_profile): """
递归合并 profile,extra_profile 优先 """
merged = dict(base_profile or {}) for key, value in (extra_profile or {}).items(): if isinstance(value, dict) and isinstance(merged.get(key), dict): merged[key] = _merge_profile(merged[key], value) else: merged[key] = value return merged
def _merge_headers(profile_headers, custom_headers): """
合并站点默认头和调用方传入头,后者优先 """
merged = {} if profile_headers: merged.update(profile_headers) if custom_headers: merged.update(custom_headers) if "User-Agent" not in merged: merged["User-Agent"] = DEFAULT_USER_AGENT return merged
def _normalize_cookies(cookie_input): """
支持 dict 或 cookie 字符串两种格式 """
if not cookie_input: return {}
if isinstance(cookie_input, dict): return {str(k): str(v) for k, v in cookie_input.items()}
if isinstance(cookie_input, str): cookie_map = {} parts = [p.strip() for p in cookie_input.split(";") if p.strip()] for part in parts: if "=" not in part: continue k, v = part.split("=", 1) cookie_map[k.strip()] = v.strip() return cookie_map
return {}
def _merge_cookies(base_cookies, more_cookies): """
合并 cookie 字典,后者优先 """
merged = {} if base_cookies: merged.update(base_cookies) if more_cookies: merged.update(more_cookies) return merged
def _extract_cookie_from_headers(headers): """
从 headers 中提取 Cookie,并返回 (clean_headers, cookie_map) """
if not headers: return {}, {}
clean_headers = dict(headers) cookie_value = clean_headers.pop("Cookie", None) if cookie_value is None: cookie_value = clean_headers.pop("cookie", None) cookie_map = _normalize_cookies(cookie_value) return clean_headers, cookie_map
def _sanitize_dynamic_headers(headers): """
Playwright 不建议透传部分连接级头,避免被浏览器内部覆盖后冲突 """
blocked = {"host", "content-length", "connection", "cookie"} return {k: v for k, v in (headers or {}).items() if k.lower() not in blocked}
def _build_dynamic_target_urls(target_url): """
对 onion 站点构建动态访问候选 URL。 优先使用原始 URL;若为 https 的 onion,再尝试回退到 http。 """
candidates = [target_url] try: parsed = urlparse(target_url) hostname = (parsed.hostname or "").lower() if parsed.scheme == "https" and hostname.endswith(".onion"): candidates.append(target_url.replace("https://", "http://", 1)) except Exception: pass return candidates
def _should_retry_status(status_code, retry_status_codes): """
判断状态码是否应重试 """
try: code = int(status_code) except Exception: return False return code in set(retry_status_codes or [])
# === 核心方法 1: 动态采集实现 (Playwright) ===async def _fetch_dynamic_content_async(target_url, client_ip, runtime_cfg): """
使用 Playwright + Firefox 抓取动态页面 (含自动滚动) """
logger.info(f"[动态] 启动浏览器内核 | 来源IP: {client_ip} | 目标: {target_url}")
start_time = time.time() req_headers = runtime_cfg.get("headers", {}) req_cookies = runtime_cfg.get("cookies", {}) dynamic_cfg = runtime_cfg.get("dynamic", {}) initial_wait_ms = int(dynamic_cfg.get("initial_wait_ms", 5000)) post_scroll_wait_ms = int(dynamic_cfg.get("post_scroll_wait_ms", 2000)) wait_selector = dynamic_cfg.get("wait_selector") wait_selector_timeout_ms = int(dynamic_cfg.get("wait_selector_timeout_ms", 20000)) goto_timeout_ms = int(dynamic_cfg.get("goto_timeout_ms", 90000)) goto_wait_until = dynamic_cfg.get("goto_wait_until", "load") retry_times = int(dynamic_cfg.get("retry_times", 1)) retry_interval_ms = int(dynamic_cfg.get("retry_interval_ms", 1000)) min_content_len = int(dynamic_cfg.get("min_content_len", 1)) retry_status_codes = dynamic_cfg.get("retry_status_codes", [502, 503, 504]) dynamic_headers = _sanitize_dynamic_headers(req_headers)
async with async_playwright() as p: browser = None try: browser = await p.firefox.launch( headless=True, proxy={"server": DYNAMIC_PROXY_SERVER} )
context = await browser.new_context( user_agent=req_headers.get("User-Agent", DEFAULT_USER_AGENT), extra_http_headers=dynamic_headers, ignore_https_errors=True ) if req_cookies: await context.add_cookies([ {"name": k, "value": v, "domain": urlparse(target_url).hostname, "path": "/"} for k, v in req_cookies.items() ]) page = await context.new_page()
response = None html_content = "" status_code = 0 last_error = None candidate_urls = _build_dynamic_target_urls(target_url) total_attempts = max(1, retry_times)
for attempt in range(1, total_attempts + 1): for candidate_url in candidate_urls: logger.info( f"[动态] 第{attempt}/{total_attempts}次访问 | Timeout {goto_timeout_ms / 1000:.0f}s | URL: {candidate_url}" ) try: response = await page.goto(candidate_url, timeout=goto_timeout_ms, wait_until=goto_wait_until) if not response: raise Exception("Response is None")
if wait_selector: try: await page.wait_for_selector(wait_selector, timeout=wait_selector_timeout_ms) except Exception: logger.info(f"[动态] 等待选择器超时,继续执行 | selector={wait_selector}")
await page.wait_for_timeout(initial_wait_ms) await _auto_scroll(page, logger, candidate_url) await page.wait_for_timeout(post_scroll_wait_ms)
html_content = await page.content() or "" status_code = response.status if _should_retry_status(status_code, retry_status_codes): raise Exception(f"命中可重试状态码 | status_code={status_code}") if len(html_content) < min_content_len: raise Exception(f"内容过短,疑似未获取到正文 | length={len(html_content)}")
cost_time = time.time() - start_time logger.info( f"[动态] 采集成功 | 耗时: {cost_time:.2f}s | 状态码: {status_code} | 长度: {len(html_content)}" ) return { "status_code": status_code, "content": html_content, } except Exception as nav_err: last_error = nav_err logger.warning(f"[动态] 当前尝试失败 | URL: {candidate_url} | Error: {str(nav_err)}")
if attempt < total_attempts: await page.wait_for_timeout(retry_interval_ms)
if last_error: raise last_error raise Exception("动态采集失败,未获得有效响应")
except Exception as e: raise e finally: if browser: await browser.close()
def fetch_dynamic_content(target_url, client_ip, runtime_cfg): """
在线程池中执行 Async Playwright,避免 sync API 与 loop 冲突并支持并发。 """
future = DYNAMIC_EXECUTOR.submit( lambda: asyncio.run(_fetch_dynamic_content_async(target_url, client_ip, runtime_cfg)) ) return future.result()
# === 核心方法 2: 静态采集实现 (Requests) ===def fetch_static_content(target_url, client_ip, runtime_cfg): """
使用 Requests 抓取静态页面 """
logger.info(f"[静态] 发起请求 | 来源IP: {client_ip} | 目标: {target_url}")
headers = runtime_cfg.get("headers", {}) cookies = runtime_cfg.get("cookies", {}) static_cfg = runtime_cfg.get("static", {}) timeout = int(static_cfg.get("timeout_s", 60)) allow_redirects = bool(static_cfg.get("allow_redirects", True)) retry_times = int(static_cfg.get("retry_times", 1)) retry_interval_ms = int(static_cfg.get("retry_interval_ms", 1000)) min_content_len = int(static_cfg.get("min_content_len", 1)) retry_status_codes = static_cfg.get("retry_status_codes", [502, 503, 504])
last_error = None total_attempts = max(1, retry_times) for attempt in range(1, total_attempts + 1): try: # 使用 Privoxy 代理 resp = requests.get( target_url, proxies=STATIC_PROXIES, headers=headers, cookies=cookies, timeout=timeout, allow_redirects=allow_redirects ) content = resp.text or "" if _should_retry_status(resp.status_code, retry_status_codes): raise Exception(f"命中可重试状态码 | status_code={resp.status_code}") if len(content) < min_content_len: raise Exception(f"内容过短,疑似未获取到正文 | length={len(content)}")
logger.info(f"[静态] 采集成功 | 状态码: {resp.status_code} | URL: {target_url} | 长度: {len(content)}") return { "status_code": resp.status_code, "content": content, } except Exception as req_err: last_error = req_err logger.warning(f"[静态] 第{attempt}/{total_attempts}次请求失败 | URL: {target_url} | Error: {str(req_err)}") if attempt < total_attempts: time.sleep(retry_interval_ms / 1000.0)
if last_error: raise last_error raise Exception("静态采集失败,未获得有效响应")
# === 新增辅助函数:通用自动滚动逻辑 ===async def _auto_scroll(page, logger, target_url): """
模拟人类滚动到底部,触发懒加载 """
logger.info(f"[动态] 开始自动滚动页面... | URL: {target_url}")
# 最大的滚动次数 (防止无限加载的页面卡死程序) MAX_SCROLLS = 10 # 每次滚动后的等待时间 (暗网建议长一点,3-5秒) WAIT_TIME = 3000
previous_height = await page.evaluate("document.body.scrollHeight")
for i in range(MAX_SCROLLS): # 1. 滚动到当前页面的最底部 await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
# 2. 等待页面加载新内容 (相当于 sleep) await page.wait_for_timeout(WAIT_TIME)
# 3. 获取新的高度 new_height = await page.evaluate("document.body.scrollHeight")
if new_height == previous_height: logger.info(f"[动态] 滚动结束: 高度不再变化 (次数: {i})") break
logger.info(f"[动态] 滚动触发加载: 高度从 {previous_height} 变为 {new_height}") previous_height = new_height else: logger.info(f"[动态] 滚动结束: 达到最大次数限制 ({MAX_SCROLLS})")
# === API 路由入口 ===@app.route('/crawl', methods=['POST'])def crawl_onion(): # 1. 解析参数 request_data = request.get_json() client_ip = request.remote_addr
if not request_data: logger.warning(f"请求体错误 | 来源IP: {client_ip}") return jsonify({"code": 400, "msg": "请发送 JSON 格式"}), 400
target_url = request_data.get('url') # 获取是否动态的标志,默认为 False (静态) is_dynamic = request_data.get('is_dynamic', False) custom_headers = request_data.get('headers', {}) custom_cookies = request_data.get('cookies', {}) static_timeout = request_data.get('static_timeout_s', 60) goto_timeout_ms = request_data.get('dynamic_timeout_ms', 90000) wait_selector = request_data.get('wait_selector') wait_selector_timeout_ms = request_data.get('wait_selector_timeout_ms', 30000) referer = request_data.get('referer')
if not target_url: logger.warning(f"参数缺失 | 来源IP: {client_ip}") return jsonify({"code": 400, "msg": "url 不能为空"}), 400
profile = _get_profile_by_url(target_url) if profile.get("force_dynamic"): is_dynamic = True logger.info(f"[路由] 命中站点强制策略,自动切换动态模式 | URL: {target_url}")
# 标记当前模式,用于日志 mode_tag = "动态" if is_dynamic else "静态" merged_headers = _merge_headers(profile.get("headers", {}), custom_headers) header_without_cookie, header_cookie_map = _extract_cookie_from_headers(merged_headers) merged_headers = header_without_cookie if referer: merged_headers["Referer"] = referer merged_cookies = _merge_cookies(header_cookie_map, _normalize_cookies(custom_cookies))
static_profile_cfg = profile.get("static", {}).copy() static_profile_cfg["timeout_s"] = static_timeout dynamic_profile_cfg = profile.get("dynamic", {}).copy() if wait_selector: dynamic_profile_cfg["wait_selector"] = wait_selector dynamic_profile_cfg["wait_selector_timeout_ms"] = wait_selector_timeout_ms dynamic_profile_cfg["goto_timeout_ms"] = goto_timeout_ms
runtime_cfg = { "headers": merged_headers, "cookies": merged_cookies, "static": static_profile_cfg, "dynamic": dynamic_profile_cfg }
try: result_data = {}
# === 分流逻辑 === if is_dynamic: # 走 Playwright result_data = fetch_dynamic_content(target_url, client_ip, runtime_cfg) else: # 走 Requests result_data = fetch_static_content(target_url, client_ip, runtime_cfg)
# 统一返回格式 return jsonify({ "code": 200, "msg": "success", "mode": mode_tag, # 告诉调用者用了什么模式 "data": { "status_code": result_data.get('status_code'), "url": target_url, "content": result_data.get('content') } })
except requests.exceptions.Timeout: logger.error(f"[{mode_tag}] 请求超时 | URL: {target_url}") return jsonify({"code": 504, "msg": f"{mode_tag}请求超时"}), 504
except requests.exceptions.ConnectionError: logger.error(f"[{mode_tag}] 代理连接失败 | URL: {target_url}") return jsonify({"code": 502, "msg": "代理连接失败,请检查服务状态"}), 502
except Exception as e: # 捕获 Playwright 或其他未知错误 logger.error(f"[{mode_tag}] 系统异常 | URL: {target_url} | Error: {str(e)}", exc_info=True) return jsonify({"code": 500, "msg": f"系统异常: {str(e)}"}), 500
if __name__ == '__main__': # 生产环境请使用 Gunicorn 启动 app.run(host='0.0.0.0', port=8000)
|