暗网采集的部署
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

605 lines
22 KiB

  1. # -*- coding: utf-8 -*-
  2. # 1. 【核心】必须在最开头打猴子补丁,支持异步并发 更新:增加了动态请求,将页面拉到最底部的操作
  3. from gevent import monkey
  4. monkey.patch_all()
  5. import os
  6. import logging
  7. import time
  8. import asyncio
  9. from concurrent.futures import ThreadPoolExecutor
  10. from urllib.parse import urlparse
  11. from logging.handlers import RotatingFileHandler
  12. from flask import Flask, request, jsonify
  13. import requests
  14. from playwright.async_api import async_playwright
  15. app = Flask(__name__)
  16. # === 配置区域 ===
  17. # 1. 静态请求代理 (Privoxy - HTTP)
  18. STATIC_PROXY_IP = "127.0.0.1"
  19. STATIC_PROXY_PORT = "19095"
  20. STATIC_PROXIES = {
  21. "http": f"http://{STATIC_PROXY_IP}:{STATIC_PROXY_PORT}",
  22. "https": f"http://{STATIC_PROXY_IP}:{STATIC_PROXY_PORT}"
  23. }
  24. # 2. 动态请求代理 (Tor - SOCKS5)
  25. # Playwright 直接连 Tor 端口,效率更高
  26. DYNAMIC_PROXY_SERVER = "socks5://127.0.0.1:9050"
  27. # 动态采集专用线程池,在线程内运行 async Playwright
  28. DYNAMIC_MAX_WORKERS = 6
  29. DYNAMIC_EXECUTOR = ThreadPoolExecutor(max_workers=DYNAMIC_MAX_WORKERS, thread_name_prefix="pw-dyn")
  30. DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0"
  31. # 特定站点可在这里做定制化策略
  32. SITE_PROFILES = {
  33. "pitchprash4aqilfr7sbmuwve3pnkpylqwxjbj2q5o4szcfeea6d27yd.onion": {
  34. "force_dynamic": True,
  35. "headers": {
  36. "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
  37. "Accept-Language": "en-US,en;q=0.5",
  38. "Connection": "keep-alive",
  39. "Upgrade-Insecure-Requests": "1",
  40. "User-Agent": DEFAULT_USER_AGENT,
  41. },
  42. "static": {
  43. "allow_redirects": True,
  44. "timeout_s": 90,
  45. "retry_times": 3,
  46. "retry_interval_ms": 1200
  47. },
  48. "dynamic": {
  49. "initial_wait_ms": 8000,
  50. "post_scroll_wait_ms": 3000,
  51. "wait_selector": "main, article, .post, .search-results, #content, body",
  52. "wait_selector_timeout_ms": 30000,
  53. "goto_wait_until": "domcontentloaded",
  54. "retry_times": 3,
  55. "retry_interval_ms": 1500,
  56. "min_content_len": 200
  57. }
  58. }
  59. }
  60. # 规则匹配:用于批量覆盖“同类页面”,如任意 onion 站的搜索页
  61. RULE_PROFILES = [
  62. {
  63. "name": "xmh57_prefix_omega",
  64. "match": {
  65. "hostname_prefix": "xmh57jrknzkhv6y3ls3ubitzfqnkrwxhopf5aygthi7d6rplyvk3noyd",
  66. "path_contains": ["/cgi-bin/omega/omega"]
  67. },
  68. "profile": {
  69. "headers": {
  70. "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
  71. "Accept-Language": "en-US,en;q=0.5",
  72. "Upgrade-Insecure-Requests": "1",
  73. "User-Agent": DEFAULT_USER_AGENT
  74. },
  75. "static": {
  76. "allow_redirects": True,
  77. "timeout_s": 120,
  78. "retry_times": 4,
  79. "retry_interval_ms": 1500,
  80. "retry_status_codes": [502, 503, 504],
  81. "min_content_len": 120
  82. },
  83. "dynamic": {
  84. "initial_wait_ms": 8000,
  85. "post_scroll_wait_ms": 2500,
  86. "wait_selector": "body, main, #content, .results, .record",
  87. "wait_selector_timeout_ms": 25000,
  88. "goto_wait_until": "domcontentloaded",
  89. "retry_times": 3,
  90. "retry_interval_ms": 1500,
  91. "retry_status_codes": [502, 503, 504],
  92. "min_content_len": 120
  93. }
  94. }
  95. },
  96. {
  97. "name": "generic_onion_search",
  98. "match": {
  99. "hostname_suffix": ".onion",
  100. "path_contains": ["/search"]
  101. },
  102. "profile": {
  103. "headers": {
  104. "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
  105. "Accept-Language": "en-US,en;q=0.5",
  106. "Upgrade-Insecure-Requests": "1",
  107. "User-Agent": DEFAULT_USER_AGENT
  108. },
  109. "static": {
  110. "allow_redirects": True,
  111. "timeout_s": 90,
  112. "retry_times": 2,
  113. "retry_interval_ms": 1000
  114. },
  115. "dynamic": {
  116. "initial_wait_ms": 8000,
  117. "post_scroll_wait_ms": 3000,
  118. "wait_selector": "main, article, .post, .search-results, #content, body",
  119. "wait_selector_timeout_ms": 30000,
  120. "goto_wait_until": "domcontentloaded",
  121. "retry_times": 2,
  122. "retry_interval_ms": 1200,
  123. "min_content_len": 120
  124. }
  125. }
  126. }
  127. ]
  128. # === 日志配置函数 (保持不变) ===
  129. def setup_logging():
  130. if not os.path.exists('logs'):
  131. os.makedirs('logs')
  132. logger = logging.getLogger('darkweb_spider')
  133. logger.setLevel(logging.INFO)
  134. formatter = logging.Formatter(
  135. '%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
  136. )
  137. info_handler = RotatingFileHandler('logs/info.log', maxBytes=10 * 1024 * 1024, backupCount=10, encoding='utf-8')
  138. info_handler.setLevel(logging.INFO)
  139. info_handler.setFormatter(formatter)
  140. error_handler = RotatingFileHandler('logs/error.log', maxBytes=10 * 1024 * 1024, backupCount=10, encoding='utf-8')
  141. error_handler.setLevel(logging.ERROR)
  142. error_handler.setFormatter(formatter)
  143. logger.addHandler(info_handler)
  144. logger.addHandler(error_handler)
  145. return logger
  146. # 初始化日志对象
  147. logger = setup_logging()
  148. def _get_profile_by_url(target_url):
  149. """
  150. +
  151. """
  152. try:
  153. parsed = urlparse(target_url)
  154. hostname = (parsed.hostname or "").lower()
  155. path = parsed.path or "/"
  156. except Exception:
  157. hostname = ""
  158. path = "/"
  159. profile = SITE_PROFILES.get(hostname, {}).copy()
  160. # 覆盖规则:命中则叠加到当前 profile 上
  161. for rule in RULE_PROFILES:
  162. matcher = rule.get("match", {})
  163. host_suffix = (matcher.get("hostname_suffix") or "").lower()
  164. host_prefix = (matcher.get("hostname_prefix") or "").lower()
  165. path_contains = matcher.get("path_contains", [])
  166. host_ok = (not host_suffix) or hostname.endswith(host_suffix)
  167. if host_prefix:
  168. host_ok = host_ok and hostname.startswith(host_prefix)
  169. path_ok = True
  170. if path_contains:
  171. path_ok = any(token in path for token in path_contains)
  172. if host_ok and path_ok:
  173. rule_profile = rule.get("profile", {})
  174. profile = _merge_profile(profile, rule_profile)
  175. return profile
  176. def _merge_profile(base_profile, extra_profile):
  177. """
  178. profileextra_profile
  179. """
  180. merged = dict(base_profile or {})
  181. for key, value in (extra_profile or {}).items():
  182. if isinstance(value, dict) and isinstance(merged.get(key), dict):
  183. merged[key] = _merge_profile(merged[key], value)
  184. else:
  185. merged[key] = value
  186. return merged
  187. def _merge_headers(profile_headers, custom_headers):
  188. """
  189. """
  190. merged = {}
  191. if profile_headers:
  192. merged.update(profile_headers)
  193. if custom_headers:
  194. merged.update(custom_headers)
  195. if "User-Agent" not in merged:
  196. merged["User-Agent"] = DEFAULT_USER_AGENT
  197. return merged
  198. def _normalize_cookies(cookie_input):
  199. """
  200. dict cookie
  201. """
  202. if not cookie_input:
  203. return {}
  204. if isinstance(cookie_input, dict):
  205. return {str(k): str(v) for k, v in cookie_input.items()}
  206. if isinstance(cookie_input, str):
  207. cookie_map = {}
  208. parts = [p.strip() for p in cookie_input.split(";") if p.strip()]
  209. for part in parts:
  210. if "=" not in part:
  211. continue
  212. k, v = part.split("=", 1)
  213. cookie_map[k.strip()] = v.strip()
  214. return cookie_map
  215. return {}
  216. def _merge_cookies(base_cookies, more_cookies):
  217. """
  218. cookie
  219. """
  220. merged = {}
  221. if base_cookies:
  222. merged.update(base_cookies)
  223. if more_cookies:
  224. merged.update(more_cookies)
  225. return merged
  226. def _extract_cookie_from_headers(headers):
  227. """
  228. headers Cookie (clean_headers, cookie_map)
  229. """
  230. if not headers:
  231. return {}, {}
  232. clean_headers = dict(headers)
  233. cookie_value = clean_headers.pop("Cookie", None)
  234. if cookie_value is None:
  235. cookie_value = clean_headers.pop("cookie", None)
  236. cookie_map = _normalize_cookies(cookie_value)
  237. return clean_headers, cookie_map
  238. def _sanitize_dynamic_headers(headers):
  239. """
  240. Playwright
  241. """
  242. blocked = {"host", "content-length", "connection", "cookie"}
  243. return {k: v for k, v in (headers or {}).items() if k.lower() not in blocked}
  244. def _build_dynamic_target_urls(target_url):
  245. """
  246. onion 访 URL
  247. 使 URL https onion退 http
  248. """
  249. candidates = [target_url]
  250. try:
  251. parsed = urlparse(target_url)
  252. hostname = (parsed.hostname or "").lower()
  253. if parsed.scheme == "https" and hostname.endswith(".onion"):
  254. candidates.append(target_url.replace("https://", "http://", 1))
  255. except Exception:
  256. pass
  257. return candidates
  258. def _should_retry_status(status_code, retry_status_codes):
  259. """
  260. """
  261. try:
  262. code = int(status_code)
  263. except Exception:
  264. return False
  265. return code in set(retry_status_codes or [])
  266. # === 核心方法 1: 动态采集实现 (Playwright) ===
  267. async def _fetch_dynamic_content_async(target_url, client_ip, runtime_cfg):
  268. """
  269. 使 Playwright + Firefox ()
  270. """
  271. logger.info(f"[动态] 启动浏览器内核 | 来源IP: {client_ip} | 目标: {target_url}")
  272. start_time = time.time()
  273. req_headers = runtime_cfg.get("headers", {})
  274. req_cookies = runtime_cfg.get("cookies", {})
  275. dynamic_cfg = runtime_cfg.get("dynamic", {})
  276. initial_wait_ms = int(dynamic_cfg.get("initial_wait_ms", 5000))
  277. post_scroll_wait_ms = int(dynamic_cfg.get("post_scroll_wait_ms", 2000))
  278. wait_selector = dynamic_cfg.get("wait_selector")
  279. wait_selector_timeout_ms = int(dynamic_cfg.get("wait_selector_timeout_ms", 20000))
  280. goto_timeout_ms = int(dynamic_cfg.get("goto_timeout_ms", 90000))
  281. goto_wait_until = dynamic_cfg.get("goto_wait_until", "load")
  282. retry_times = int(dynamic_cfg.get("retry_times", 1))
  283. retry_interval_ms = int(dynamic_cfg.get("retry_interval_ms", 1000))
  284. min_content_len = int(dynamic_cfg.get("min_content_len", 1))
  285. retry_status_codes = dynamic_cfg.get("retry_status_codes", [502, 503, 504])
  286. dynamic_headers = _sanitize_dynamic_headers(req_headers)
  287. async with async_playwright() as p:
  288. browser = None
  289. try:
  290. browser = await p.firefox.launch(
  291. headless=True,
  292. proxy={"server": DYNAMIC_PROXY_SERVER}
  293. )
  294. context = await browser.new_context(
  295. user_agent=req_headers.get("User-Agent", DEFAULT_USER_AGENT),
  296. extra_http_headers=dynamic_headers,
  297. ignore_https_errors=True
  298. )
  299. if req_cookies:
  300. await context.add_cookies([
  301. {"name": k, "value": v, "domain": urlparse(target_url).hostname, "path": "/"}
  302. for k, v in req_cookies.items()
  303. ])
  304. page = await context.new_page()
  305. response = None
  306. html_content = ""
  307. status_code = 0
  308. last_error = None
  309. candidate_urls = _build_dynamic_target_urls(target_url)
  310. total_attempts = max(1, retry_times)
  311. for attempt in range(1, total_attempts + 1):
  312. for candidate_url in candidate_urls:
  313. logger.info(
  314. f"[动态] 第{attempt}/{total_attempts}次访问 | Timeout {goto_timeout_ms / 1000:.0f}s | URL: {candidate_url}"
  315. )
  316. try:
  317. response = await page.goto(candidate_url, timeout=goto_timeout_ms, wait_until=goto_wait_until)
  318. if not response:
  319. raise Exception("Response is None")
  320. if wait_selector:
  321. try:
  322. await page.wait_for_selector(wait_selector, timeout=wait_selector_timeout_ms)
  323. except Exception:
  324. logger.info(f"[动态] 等待选择器超时,继续执行 | selector={wait_selector}")
  325. await page.wait_for_timeout(initial_wait_ms)
  326. await _auto_scroll(page, logger, candidate_url)
  327. await page.wait_for_timeout(post_scroll_wait_ms)
  328. html_content = await page.content() or ""
  329. status_code = response.status
  330. if _should_retry_status(status_code, retry_status_codes):
  331. raise Exception(f"命中可重试状态码 | status_code={status_code}")
  332. if len(html_content) < min_content_len:
  333. raise Exception(f"内容过短,疑似未获取到正文 | length={len(html_content)}")
  334. cost_time = time.time() - start_time
  335. logger.info(
  336. f"[动态] 采集成功 | 耗时: {cost_time:.2f}s | 状态码: {status_code} | 长度: {len(html_content)}"
  337. )
  338. return {
  339. "status_code": status_code,
  340. "content": html_content,
  341. }
  342. except Exception as nav_err:
  343. last_error = nav_err
  344. logger.warning(f"[动态] 当前尝试失败 | URL: {candidate_url} | Error: {str(nav_err)}")
  345. if attempt < total_attempts:
  346. await page.wait_for_timeout(retry_interval_ms)
  347. if last_error:
  348. raise last_error
  349. raise Exception("动态采集失败,未获得有效响应")
  350. except Exception as e:
  351. raise e
  352. finally:
  353. if browser:
  354. await browser.close()
  355. def fetch_dynamic_content(target_url, client_ip, runtime_cfg):
  356. """
  357. 线 Async Playwright sync API loop
  358. """
  359. future = DYNAMIC_EXECUTOR.submit(
  360. lambda: asyncio.run(_fetch_dynamic_content_async(target_url, client_ip, runtime_cfg))
  361. )
  362. return future.result()
  363. # === 核心方法 2: 静态采集实现 (Requests) ===
  364. def fetch_static_content(target_url, client_ip, runtime_cfg):
  365. """
  366. 使 Requests
  367. """
  368. logger.info(f"[静态] 发起请求 | 来源IP: {client_ip} | 目标: {target_url}")
  369. headers = runtime_cfg.get("headers", {})
  370. cookies = runtime_cfg.get("cookies", {})
  371. static_cfg = runtime_cfg.get("static", {})
  372. timeout = int(static_cfg.get("timeout_s", 60))
  373. allow_redirects = bool(static_cfg.get("allow_redirects", True))
  374. retry_times = int(static_cfg.get("retry_times", 1))
  375. retry_interval_ms = int(static_cfg.get("retry_interval_ms", 1000))
  376. min_content_len = int(static_cfg.get("min_content_len", 1))
  377. retry_status_codes = static_cfg.get("retry_status_codes", [502, 503, 504])
  378. last_error = None
  379. total_attempts = max(1, retry_times)
  380. for attempt in range(1, total_attempts + 1):
  381. try:
  382. # 使用 Privoxy 代理
  383. resp = requests.get(
  384. target_url,
  385. proxies=STATIC_PROXIES,
  386. headers=headers,
  387. cookies=cookies,
  388. timeout=timeout,
  389. allow_redirects=allow_redirects
  390. )
  391. content = resp.text or ""
  392. if _should_retry_status(resp.status_code, retry_status_codes):
  393. raise Exception(f"命中可重试状态码 | status_code={resp.status_code}")
  394. if len(content) < min_content_len:
  395. raise Exception(f"内容过短,疑似未获取到正文 | length={len(content)}")
  396. logger.info(f"[静态] 采集成功 | 状态码: {resp.status_code} | URL: {target_url} | 长度: {len(content)}")
  397. return {
  398. "status_code": resp.status_code,
  399. "content": content,
  400. }
  401. except Exception as req_err:
  402. last_error = req_err
  403. logger.warning(f"[静态] 第{attempt}/{total_attempts}次请求失败 | URL: {target_url} | Error: {str(req_err)}")
  404. if attempt < total_attempts:
  405. time.sleep(retry_interval_ms / 1000.0)
  406. if last_error:
  407. raise last_error
  408. raise Exception("静态采集失败,未获得有效响应")
  409. # === 新增辅助函数:通用自动滚动逻辑 ===
  410. async def _auto_scroll(page, logger, target_url):
  411. """
  412. """
  413. logger.info(f"[动态] 开始自动滚动页面... | URL: {target_url}")
  414. # 最大的滚动次数 (防止无限加载的页面卡死程序)
  415. MAX_SCROLLS = 10
  416. # 每次滚动后的等待时间 (暗网建议长一点,3-5秒)
  417. WAIT_TIME = 3000
  418. previous_height = await page.evaluate("document.body.scrollHeight")
  419. for i in range(MAX_SCROLLS):
  420. # 1. 滚动到当前页面的最底部
  421. await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
  422. # 2. 等待页面加载新内容 (相当于 sleep)
  423. await page.wait_for_timeout(WAIT_TIME)
  424. # 3. 获取新的高度
  425. new_height = await page.evaluate("document.body.scrollHeight")
  426. if new_height == previous_height:
  427. logger.info(f"[动态] 滚动结束: 高度不再变化 (次数: {i})")
  428. break
  429. logger.info(f"[动态] 滚动触发加载: 高度从 {previous_height} 变为 {new_height}")
  430. previous_height = new_height
  431. else:
  432. logger.info(f"[动态] 滚动结束: 达到最大次数限制 ({MAX_SCROLLS})")
  433. # === API 路由入口 ===
  434. @app.route('/crawl', methods=['POST'])
  435. def crawl_onion():
  436. # 1. 解析参数
  437. request_data = request.get_json()
  438. client_ip = request.remote_addr
  439. if not request_data:
  440. logger.warning(f"请求体错误 | 来源IP: {client_ip}")
  441. return jsonify({"code": 400, "msg": "请发送 JSON 格式"}), 400
  442. target_url = request_data.get('url')
  443. # 获取是否动态的标志,默认为 False (静态)
  444. is_dynamic = request_data.get('is_dynamic', False)
  445. custom_headers = request_data.get('headers', {})
  446. custom_cookies = request_data.get('cookies', {})
  447. static_timeout = request_data.get('static_timeout_s', 60)
  448. goto_timeout_ms = request_data.get('dynamic_timeout_ms', 90000)
  449. wait_selector = request_data.get('wait_selector')
  450. wait_selector_timeout_ms = request_data.get('wait_selector_timeout_ms', 30000)
  451. referer = request_data.get('referer')
  452. if not target_url:
  453. logger.warning(f"参数缺失 | 来源IP: {client_ip}")
  454. return jsonify({"code": 400, "msg": "url 不能为空"}), 400
  455. profile = _get_profile_by_url(target_url)
  456. if profile.get("force_dynamic"):
  457. is_dynamic = True
  458. logger.info(f"[路由] 命中站点强制策略,自动切换动态模式 | URL: {target_url}")
  459. # 标记当前模式,用于日志
  460. mode_tag = "动态" if is_dynamic else "静态"
  461. merged_headers = _merge_headers(profile.get("headers", {}), custom_headers)
  462. header_without_cookie, header_cookie_map = _extract_cookie_from_headers(merged_headers)
  463. merged_headers = header_without_cookie
  464. if referer:
  465. merged_headers["Referer"] = referer
  466. merged_cookies = _merge_cookies(header_cookie_map, _normalize_cookies(custom_cookies))
  467. static_profile_cfg = profile.get("static", {}).copy()
  468. static_profile_cfg["timeout_s"] = static_timeout
  469. dynamic_profile_cfg = profile.get("dynamic", {}).copy()
  470. if wait_selector:
  471. dynamic_profile_cfg["wait_selector"] = wait_selector
  472. dynamic_profile_cfg["wait_selector_timeout_ms"] = wait_selector_timeout_ms
  473. dynamic_profile_cfg["goto_timeout_ms"] = goto_timeout_ms
  474. runtime_cfg = {
  475. "headers": merged_headers,
  476. "cookies": merged_cookies,
  477. "static": static_profile_cfg,
  478. "dynamic": dynamic_profile_cfg
  479. }
  480. try:
  481. result_data = {}
  482. # === 分流逻辑 ===
  483. if is_dynamic:
  484. # 走 Playwright
  485. result_data = fetch_dynamic_content(target_url, client_ip, runtime_cfg)
  486. else:
  487. # 走 Requests
  488. result_data = fetch_static_content(target_url, client_ip, runtime_cfg)
  489. # 统一返回格式
  490. return jsonify({
  491. "code": 200,
  492. "msg": "success",
  493. "mode": mode_tag, # 告诉调用者用了什么模式
  494. "data": {
  495. "status_code": result_data.get('status_code'),
  496. "url": target_url,
  497. "content": result_data.get('content')
  498. }
  499. })
  500. except requests.exceptions.Timeout:
  501. logger.error(f"[{mode_tag}] 请求超时 | URL: {target_url}")
  502. return jsonify({"code": 504, "msg": f"{mode_tag}请求超时"}), 504
  503. except requests.exceptions.ConnectionError:
  504. logger.error(f"[{mode_tag}] 代理连接失败 | URL: {target_url}")
  505. return jsonify({"code": 502, "msg": "代理连接失败,请检查服务状态"}), 502
  506. except Exception as e:
  507. # 捕获 Playwright 或其他未知错误
  508. logger.error(f"[{mode_tag}] 系统异常 | URL: {target_url} | Error: {str(e)}", exc_info=True)
  509. return jsonify({"code": 500, "msg": f"系统异常: {str(e)}"}), 500
  510. if __name__ == '__main__':
  511. # 生产环境请使用 Gunicorn 启动
  512. app.run(host='0.0.0.0', port=8000)