暗网采集的部署
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

606 lines
22 KiB

# -*- coding: utf-8 -*-
# 1. 【核心】必须在最开头打猴子补丁,支持异步并发 更新:增加了动态请求,将页面拉到最底部的操作
from gevent import monkey
monkey.patch_all()
import os
import logging
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse
from logging.handlers import RotatingFileHandler
from flask import Flask, request, jsonify
import requests
from playwright.async_api import async_playwright
app = Flask(__name__)
# === 配置区域 ===
# 1. 静态请求代理 (Privoxy - HTTP)
STATIC_PROXY_IP = "127.0.0.1"
STATIC_PROXY_PORT = "19095"
STATIC_PROXIES = {
"http": f"http://{STATIC_PROXY_IP}:{STATIC_PROXY_PORT}",
"https": f"http://{STATIC_PROXY_IP}:{STATIC_PROXY_PORT}"
}
# 2. 动态请求代理 (Tor - SOCKS5)
# Playwright 直接连 Tor 端口,效率更高
DYNAMIC_PROXY_SERVER = "socks5://127.0.0.1:9050"
# 动态采集专用线程池,在线程内运行 async Playwright
DYNAMIC_MAX_WORKERS = 6
DYNAMIC_EXECUTOR = ThreadPoolExecutor(max_workers=DYNAMIC_MAX_WORKERS, thread_name_prefix="pw-dyn")
DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0"
# 特定站点可在这里做定制化策略
SITE_PROFILES = {
"pitchprash4aqilfr7sbmuwve3pnkpylqwxjbj2q5o4szcfeea6d27yd.onion": {
"force_dynamic": True,
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"User-Agent": DEFAULT_USER_AGENT,
},
"static": {
"allow_redirects": True,
"timeout_s": 90,
"retry_times": 3,
"retry_interval_ms": 1200
},
"dynamic": {
"initial_wait_ms": 8000,
"post_scroll_wait_ms": 3000,
"wait_selector": "main, article, .post, .search-results, #content, body",
"wait_selector_timeout_ms": 30000,
"goto_wait_until": "domcontentloaded",
"retry_times": 3,
"retry_interval_ms": 1500,
"min_content_len": 200
}
}
}
# 规则匹配:用于批量覆盖“同类页面”,如任意 onion 站的搜索页
RULE_PROFILES = [
{
"name": "xmh57_prefix_omega",
"match": {
"hostname_prefix": "xmh57jrknzkhv6y3ls3ubitzfqnkrwxhopf5aygthi7d6rplyvk3noyd",
"path_contains": ["/cgi-bin/omega/omega"]
},
"profile": {
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Upgrade-Insecure-Requests": "1",
"User-Agent": DEFAULT_USER_AGENT
},
"static": {
"allow_redirects": True,
"timeout_s": 120,
"retry_times": 4,
"retry_interval_ms": 1500,
"retry_status_codes": [502, 503, 504],
"min_content_len": 120
},
"dynamic": {
"initial_wait_ms": 8000,
"post_scroll_wait_ms": 2500,
"wait_selector": "body, main, #content, .results, .record",
"wait_selector_timeout_ms": 25000,
"goto_wait_until": "domcontentloaded",
"retry_times": 3,
"retry_interval_ms": 1500,
"retry_status_codes": [502, 503, 504],
"min_content_len": 120
}
}
},
{
"name": "generic_onion_search",
"match": {
"hostname_suffix": ".onion",
"path_contains": ["/search"]
},
"profile": {
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Upgrade-Insecure-Requests": "1",
"User-Agent": DEFAULT_USER_AGENT
},
"static": {
"allow_redirects": True,
"timeout_s": 90,
"retry_times": 2,
"retry_interval_ms": 1000
},
"dynamic": {
"initial_wait_ms": 8000,
"post_scroll_wait_ms": 3000,
"wait_selector": "main, article, .post, .search-results, #content, body",
"wait_selector_timeout_ms": 30000,
"goto_wait_until": "domcontentloaded",
"retry_times": 2,
"retry_interval_ms": 1200,
"min_content_len": 120
}
}
}
]
# === 日志配置函数 (保持不变) ===
def setup_logging():
if not os.path.exists('logs'):
os.makedirs('logs')
logger = logging.getLogger('darkweb_spider')
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
info_handler = RotatingFileHandler('logs/info.log', maxBytes=10 * 1024 * 1024, backupCount=10, encoding='utf-8')
info_handler.setLevel(logging.INFO)
info_handler.setFormatter(formatter)
error_handler = RotatingFileHandler('logs/error.log', maxBytes=10 * 1024 * 1024, backupCount=10, encoding='utf-8')
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
logger.addHandler(info_handler)
logger.addHandler(error_handler)
return logger
# 初始化日志对象
logger = setup_logging()
def _get_profile_by_url(target_url):
"""
根据域名 + 规则返回站点定制配置
"""
try:
parsed = urlparse(target_url)
hostname = (parsed.hostname or "").lower()
path = parsed.path or "/"
except Exception:
hostname = ""
path = "/"
profile = SITE_PROFILES.get(hostname, {}).copy()
# 覆盖规则:命中则叠加到当前 profile 上
for rule in RULE_PROFILES:
matcher = rule.get("match", {})
host_suffix = (matcher.get("hostname_suffix") or "").lower()
host_prefix = (matcher.get("hostname_prefix") or "").lower()
path_contains = matcher.get("path_contains", [])
host_ok = (not host_suffix) or hostname.endswith(host_suffix)
if host_prefix:
host_ok = host_ok and hostname.startswith(host_prefix)
path_ok = True
if path_contains:
path_ok = any(token in path for token in path_contains)
if host_ok and path_ok:
rule_profile = rule.get("profile", {})
profile = _merge_profile(profile, rule_profile)
return profile
def _merge_profile(base_profile, extra_profile):
"""
递归合并 profile,extra_profile 优先
"""
merged = dict(base_profile or {})
for key, value in (extra_profile or {}).items():
if isinstance(value, dict) and isinstance(merged.get(key), dict):
merged[key] = _merge_profile(merged[key], value)
else:
merged[key] = value
return merged
def _merge_headers(profile_headers, custom_headers):
"""
合并站点默认头和调用方传入头,后者优先
"""
merged = {}
if profile_headers:
merged.update(profile_headers)
if custom_headers:
merged.update(custom_headers)
if "User-Agent" not in merged:
merged["User-Agent"] = DEFAULT_USER_AGENT
return merged
def _normalize_cookies(cookie_input):
"""
支持 dict 或 cookie 字符串两种格式
"""
if not cookie_input:
return {}
if isinstance(cookie_input, dict):
return {str(k): str(v) for k, v in cookie_input.items()}
if isinstance(cookie_input, str):
cookie_map = {}
parts = [p.strip() for p in cookie_input.split(";") if p.strip()]
for part in parts:
if "=" not in part:
continue
k, v = part.split("=", 1)
cookie_map[k.strip()] = v.strip()
return cookie_map
return {}
def _merge_cookies(base_cookies, more_cookies):
"""
合并 cookie 字典,后者优先
"""
merged = {}
if base_cookies:
merged.update(base_cookies)
if more_cookies:
merged.update(more_cookies)
return merged
def _extract_cookie_from_headers(headers):
"""
从 headers 中提取 Cookie,并返回 (clean_headers, cookie_map)
"""
if not headers:
return {}, {}
clean_headers = dict(headers)
cookie_value = clean_headers.pop("Cookie", None)
if cookie_value is None:
cookie_value = clean_headers.pop("cookie", None)
cookie_map = _normalize_cookies(cookie_value)
return clean_headers, cookie_map
def _sanitize_dynamic_headers(headers):
"""
Playwright 不建议透传部分连接级头,避免被浏览器内部覆盖后冲突
"""
blocked = {"host", "content-length", "connection", "cookie"}
return {k: v for k, v in (headers or {}).items() if k.lower() not in blocked}
def _build_dynamic_target_urls(target_url):
"""
对 onion 站点构建动态访问候选 URL。
优先使用原始 URL;若为 https 的 onion,再尝试回退到 http。
"""
candidates = [target_url]
try:
parsed = urlparse(target_url)
hostname = (parsed.hostname or "").lower()
if parsed.scheme == "https" and hostname.endswith(".onion"):
candidates.append(target_url.replace("https://", "http://", 1))
except Exception:
pass
return candidates
def _should_retry_status(status_code, retry_status_codes):
"""
判断状态码是否应重试
"""
try:
code = int(status_code)
except Exception:
return False
return code in set(retry_status_codes or [])
# === 核心方法 1: 动态采集实现 (Playwright) ===
async def _fetch_dynamic_content_async(target_url, client_ip, runtime_cfg):
"""
使用 Playwright + Firefox 抓取动态页面 (含自动滚动)
"""
logger.info(f"[动态] 启动浏览器内核 | 来源IP: {client_ip} | 目标: {target_url}")
start_time = time.time()
req_headers = runtime_cfg.get("headers", {})
req_cookies = runtime_cfg.get("cookies", {})
dynamic_cfg = runtime_cfg.get("dynamic", {})
initial_wait_ms = int(dynamic_cfg.get("initial_wait_ms", 5000))
post_scroll_wait_ms = int(dynamic_cfg.get("post_scroll_wait_ms", 2000))
wait_selector = dynamic_cfg.get("wait_selector")
wait_selector_timeout_ms = int(dynamic_cfg.get("wait_selector_timeout_ms", 20000))
goto_timeout_ms = int(dynamic_cfg.get("goto_timeout_ms", 90000))
goto_wait_until = dynamic_cfg.get("goto_wait_until", "load")
retry_times = int(dynamic_cfg.get("retry_times", 1))
retry_interval_ms = int(dynamic_cfg.get("retry_interval_ms", 1000))
min_content_len = int(dynamic_cfg.get("min_content_len", 1))
retry_status_codes = dynamic_cfg.get("retry_status_codes", [502, 503, 504])
dynamic_headers = _sanitize_dynamic_headers(req_headers)
async with async_playwright() as p:
browser = None
try:
browser = await p.firefox.launch(
headless=True,
proxy={"server": DYNAMIC_PROXY_SERVER}
)
context = await browser.new_context(
user_agent=req_headers.get("User-Agent", DEFAULT_USER_AGENT),
extra_http_headers=dynamic_headers,
ignore_https_errors=True
)
if req_cookies:
await context.add_cookies([
{"name": k, "value": v, "domain": urlparse(target_url).hostname, "path": "/"}
for k, v in req_cookies.items()
])
page = await context.new_page()
response = None
html_content = ""
status_code = 0
last_error = None
candidate_urls = _build_dynamic_target_urls(target_url)
total_attempts = max(1, retry_times)
for attempt in range(1, total_attempts + 1):
for candidate_url in candidate_urls:
logger.info(
f"[动态] 第{attempt}/{total_attempts}次访问 | Timeout {goto_timeout_ms / 1000:.0f}s | URL: {candidate_url}"
)
try:
response = await page.goto(candidate_url, timeout=goto_timeout_ms, wait_until=goto_wait_until)
if not response:
raise Exception("Response is None")
if wait_selector:
try:
await page.wait_for_selector(wait_selector, timeout=wait_selector_timeout_ms)
except Exception:
logger.info(f"[动态] 等待选择器超时,继续执行 | selector={wait_selector}")
await page.wait_for_timeout(initial_wait_ms)
await _auto_scroll(page, logger, candidate_url)
await page.wait_for_timeout(post_scroll_wait_ms)
html_content = await page.content() or ""
status_code = response.status
if _should_retry_status(status_code, retry_status_codes):
raise Exception(f"命中可重试状态码 | status_code={status_code}")
if len(html_content) < min_content_len:
raise Exception(f"内容过短,疑似未获取到正文 | length={len(html_content)}")
cost_time = time.time() - start_time
logger.info(
f"[动态] 采集成功 | 耗时: {cost_time:.2f}s | 状态码: {status_code} | 长度: {len(html_content)}"
)
return {
"status_code": status_code,
"content": html_content,
}
except Exception as nav_err:
last_error = nav_err
logger.warning(f"[动态] 当前尝试失败 | URL: {candidate_url} | Error: {str(nav_err)}")
if attempt < total_attempts:
await page.wait_for_timeout(retry_interval_ms)
if last_error:
raise last_error
raise Exception("动态采集失败,未获得有效响应")
except Exception as e:
raise e
finally:
if browser:
await browser.close()
def fetch_dynamic_content(target_url, client_ip, runtime_cfg):
"""
在线程池中执行 Async Playwright,避免 sync API 与 loop 冲突并支持并发。
"""
future = DYNAMIC_EXECUTOR.submit(
lambda: asyncio.run(_fetch_dynamic_content_async(target_url, client_ip, runtime_cfg))
)
return future.result()
# === 核心方法 2: 静态采集实现 (Requests) ===
def fetch_static_content(target_url, client_ip, runtime_cfg):
"""
使用 Requests 抓取静态页面
"""
logger.info(f"[静态] 发起请求 | 来源IP: {client_ip} | 目标: {target_url}")
headers = runtime_cfg.get("headers", {})
cookies = runtime_cfg.get("cookies", {})
static_cfg = runtime_cfg.get("static", {})
timeout = int(static_cfg.get("timeout_s", 60))
allow_redirects = bool(static_cfg.get("allow_redirects", True))
retry_times = int(static_cfg.get("retry_times", 1))
retry_interval_ms = int(static_cfg.get("retry_interval_ms", 1000))
min_content_len = int(static_cfg.get("min_content_len", 1))
retry_status_codes = static_cfg.get("retry_status_codes", [502, 503, 504])
last_error = None
total_attempts = max(1, retry_times)
for attempt in range(1, total_attempts + 1):
try:
# 使用 Privoxy 代理
resp = requests.get(
target_url,
proxies=STATIC_PROXIES,
headers=headers,
cookies=cookies,
timeout=timeout,
allow_redirects=allow_redirects
)
content = resp.text or ""
if _should_retry_status(resp.status_code, retry_status_codes):
raise Exception(f"命中可重试状态码 | status_code={resp.status_code}")
if len(content) < min_content_len:
raise Exception(f"内容过短,疑似未获取到正文 | length={len(content)}")
logger.info(f"[静态] 采集成功 | 状态码: {resp.status_code} | URL: {target_url} | 长度: {len(content)}")
return {
"status_code": resp.status_code,
"content": content,
}
except Exception as req_err:
last_error = req_err
logger.warning(f"[静态] 第{attempt}/{total_attempts}次请求失败 | URL: {target_url} | Error: {str(req_err)}")
if attempt < total_attempts:
time.sleep(retry_interval_ms / 1000.0)
if last_error:
raise last_error
raise Exception("静态采集失败,未获得有效响应")
# === 新增辅助函数:通用自动滚动逻辑 ===
async def _auto_scroll(page, logger, target_url):
"""
模拟人类滚动到底部,触发懒加载
"""
logger.info(f"[动态] 开始自动滚动页面... | URL: {target_url}")
# 最大的滚动次数 (防止无限加载的页面卡死程序)
MAX_SCROLLS = 10
# 每次滚动后的等待时间 (暗网建议长一点,3-5秒)
WAIT_TIME = 3000
previous_height = await page.evaluate("document.body.scrollHeight")
for i in range(MAX_SCROLLS):
# 1. 滚动到当前页面的最底部
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
# 2. 等待页面加载新内容 (相当于 sleep)
await page.wait_for_timeout(WAIT_TIME)
# 3. 获取新的高度
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == previous_height:
logger.info(f"[动态] 滚动结束: 高度不再变化 (次数: {i})")
break
logger.info(f"[动态] 滚动触发加载: 高度从 {previous_height} 变为 {new_height}")
previous_height = new_height
else:
logger.info(f"[动态] 滚动结束: 达到最大次数限制 ({MAX_SCROLLS})")
# === API 路由入口 ===
@app.route('/crawl', methods=['POST'])
def crawl_onion():
# 1. 解析参数
request_data = request.get_json()
client_ip = request.remote_addr
if not request_data:
logger.warning(f"请求体错误 | 来源IP: {client_ip}")
return jsonify({"code": 400, "msg": "请发送 JSON 格式"}), 400
target_url = request_data.get('url')
# 获取是否动态的标志,默认为 False (静态)
is_dynamic = request_data.get('is_dynamic', False)
custom_headers = request_data.get('headers', {})
custom_cookies = request_data.get('cookies', {})
static_timeout = request_data.get('static_timeout_s', 60)
goto_timeout_ms = request_data.get('dynamic_timeout_ms', 90000)
wait_selector = request_data.get('wait_selector')
wait_selector_timeout_ms = request_data.get('wait_selector_timeout_ms', 30000)
referer = request_data.get('referer')
if not target_url:
logger.warning(f"参数缺失 | 来源IP: {client_ip}")
return jsonify({"code": 400, "msg": "url 不能为空"}), 400
profile = _get_profile_by_url(target_url)
if profile.get("force_dynamic"):
is_dynamic = True
logger.info(f"[路由] 命中站点强制策略,自动切换动态模式 | URL: {target_url}")
# 标记当前模式,用于日志
mode_tag = "动态" if is_dynamic else "静态"
merged_headers = _merge_headers(profile.get("headers", {}), custom_headers)
header_without_cookie, header_cookie_map = _extract_cookie_from_headers(merged_headers)
merged_headers = header_without_cookie
if referer:
merged_headers["Referer"] = referer
merged_cookies = _merge_cookies(header_cookie_map, _normalize_cookies(custom_cookies))
static_profile_cfg = profile.get("static", {}).copy()
static_profile_cfg["timeout_s"] = static_timeout
dynamic_profile_cfg = profile.get("dynamic", {}).copy()
if wait_selector:
dynamic_profile_cfg["wait_selector"] = wait_selector
dynamic_profile_cfg["wait_selector_timeout_ms"] = wait_selector_timeout_ms
dynamic_profile_cfg["goto_timeout_ms"] = goto_timeout_ms
runtime_cfg = {
"headers": merged_headers,
"cookies": merged_cookies,
"static": static_profile_cfg,
"dynamic": dynamic_profile_cfg
}
try:
result_data = {}
# === 分流逻辑 ===
if is_dynamic:
# 走 Playwright
result_data = fetch_dynamic_content(target_url, client_ip, runtime_cfg)
else:
# 走 Requests
result_data = fetch_static_content(target_url, client_ip, runtime_cfg)
# 统一返回格式
return jsonify({
"code": 200,
"msg": "success",
"mode": mode_tag, # 告诉调用者用了什么模式
"data": {
"status_code": result_data.get('status_code'),
"url": target_url,
"content": result_data.get('content')
}
})
except requests.exceptions.Timeout:
logger.error(f"[{mode_tag}] 请求超时 | URL: {target_url}")
return jsonify({"code": 504, "msg": f"{mode_tag}请求超时"}), 504
except requests.exceptions.ConnectionError:
logger.error(f"[{mode_tag}] 代理连接失败 | URL: {target_url}")
return jsonify({"code": 502, "msg": "代理连接失败,请检查服务状态"}), 502
except Exception as e:
# 捕获 Playwright 或其他未知错误
logger.error(f"[{mode_tag}] 系统异常 | URL: {target_url} | Error: {str(e)}", exc_info=True)
return jsonify({"code": 500, "msg": f"系统异常: {str(e)}"}), 500
if __name__ == '__main__':
# 生产环境请使用 Gunicorn 启动
app.run(host='0.0.0.0', port=8000)