|
|
package com.example; import com.fasterxml.jackson.databind.ObjectMapper; import okhttp3.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements;
import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern;
public class projTopic { private static final String TOPIC_NAME = "projTopic"; private static final String BOOTSTRAP_SERVERS = "node-01:19092"; private static KafkaProducer<String, String> producer; private static ObjectMapper objectMapper = new ObjectMapper(); private static final Random random = new Random(); private static List<String> proxyList = new ArrayList<>(); // 代理池
private static int currentProxyIndex = 0; // 当前使用的代理索引
static { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
producer = new KafkaProducer<>(props); try { proxyList = Files.readAllLines(Paths.get("proxy.txt")); if (proxyList.isEmpty()) { System.out.println("警告: proxy.txt 为空,未加载任何代理"); } else { System.out.println("成功加载 " + proxyList.size() + " 个代理"); } } catch (IOException e) { System.err.println("读取 proxy.txt 失败: " + e.getMessage()); } }
public static void main(String[] args) throws IOException, InterruptedException { List<String> keywords = Files.readAllLines(Paths.get("keywords.txt")); List<String> cleanedKeywords = new ArrayList<>(); for (String keyword : keywords) { String cleaned = keyword.split(",")[0].trim(); // 取逗号前的部分并去除首尾空格
cleaned = cleaned.replaceAll("\\s+", "+"); // 替换所有空格为 +
cleanedKeywords.add(cleaned); } ExecutorService executor = Executors.newFixedThreadPool(4); // 4 个线程
for (String keyword : cleanedKeywords) { executor.submit(() -> { try { int sleepTime = random.nextInt(1001) + 30000; String load = "5|0|20|https://www.nsf.gov/awardsearch/jsp/gwt/search/|57BE5CA45E781DC0159F727F8A8205EB|gov.nsf.research.awardsearch.gwt.client.SearchAwardService|getAwards|gov.nsf.research.awardsearch.gwt.bean.SearchRequestBean/3930579236|com.extjs.gxt.ui.client.data.PagingLoadConfig|java.util.HashMap/962170901|java.lang.String/2004016611|QueryText|" + keyword + "|ActiveAwards|true|com.extjs.gxt.ui.client.data.BasePagingLoadConfig/2011366567|com.extjs.gxt.ui.client.data.RpcMap/3441186752|sortField|sortDir|com.extjs.gxt.ui.client.Style$SortDir/640452531|offset|java.lang.Integer/3438268394|limit|1|2|3|4|2|5|6|5|7|2|8|9|8|10|8|11|8|12|13|0|1|14|4|15|0|16|17|0|18|19|0|20|19|30|"; for(int i=0;;i++){ OkHttpClient client = createClientWithProxy(); MediaType mediaType = MediaType.parse("text/x-gwt-rpc; charset=UTF-8"); RequestBody body = RequestBody.create(mediaType, load);
Request request = new Request.Builder() .url("https://www.nsf.gov/awardsearch/jsp/gwt/search/.searchaward") .method("POST", body) .addHeader("Content-Type", "text/x-gwt-rpc; charset=UTF-8") .addHeader("X-GWT-Module-Base", "https://www.nsf.gov/awardsearch/jsp/gwt/search/") .addHeader("X-GWT-Permutation", "368C3CF86AA4CD7DB2250B35B844C1C2") // .addHeader("cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
.build(); Response response = executeWithRetry(client, request, keyword); String content = response.body().string();
Pattern pattern = Pattern.compile("\"awdNumber\",\"(\\d+)\""); Matcher matcher = pattern.matcher(content);
List<String> numbers = new ArrayList<>(); // 用于存储匹配的数字
// 查找并提取数字
List<String> additionalNumbers = new ArrayList<>(); List<String> urls = new ArrayList<>(); // 查找匹配项
while (matcher.find()) { // 获取捕获到的数字,并将其添加到列表中
numbers.add(matcher.group(1)); }
// 输出捕获到的数字
if (numbers.isEmpty()) { System.out.println("没找到awdNumber,继续下一种查找");
} else { for (String number : numbers) { additionalNumbers.add(number); } }
Pattern additionalPattern = Pattern.compile("\"[^\"]+\",\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\"(?:,\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\")?,\"(\\d+)\""); Matcher additionalMatcher = additionalPattern.matcher(content);
while (additionalMatcher.find()) { additionalNumbers.add(additionalMatcher.group(1)); } if (additionalNumbers.isEmpty()) { System.out.println("没找到下一页内容链接"); Thread.sleep(sleepTime); break; } else { for (String number : additionalNumbers) { String url = "https://www.nsf.gov/awardsearch/showAward?AWD_ID=" + number + "&HistoricalAwards=false"; urls.add(url); } } if (!urls.isEmpty() && urls.get(0).equals("https://www.nsf.gov/awardsearch/showAward?AWD_ID=2446604&HistoricalAwards=false")) { System.out.println("第一个 URL 是 AWD_ID=2446604,跳过关键词: " + keyword); Thread.sleep(sleepTime); return; // 跳出当前任务,处理下一个关键词
} for(String url:urls){ OkHttpClient client2 = createClientWithProxy(); MediaType mediaType2 = MediaType.parse("text/plain"); RequestBody body2 = RequestBody.create(mediaType2, ""); Request request2 = new Request.Builder() .url(url) .get() // .addHeader("Cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
.build(); Response response2 = executeWithRetry(client2, request2, keyword); System.out.println(response2.code()); String html = response2.body().string(); Document parse = Jsoup.parse(html); String title = parse.select(".pageheadline").text(); String projectNum = parse.select(".clear tr:nth-child(5) .tabletext2:nth-child(2)").text(); String projectLeader = parse.select(".clear tr:nth-child(13) .tabletext2:nth-child(2)").text(); String projectStartTime = convertToTimestamp(parse.select(".clear tr:nth-child(8) .tabletext2:nth-child(2)").text()); String projectEndTime = convertToTimestamp2(parse.select(".clear tr:nth-child(9) .tabletext2:nth-child(2)").text()); String sponsorPart = parse.select(".clear tr:nth-child(2) .tabletext2:nth-child(2)").text(); String country = "USA"; String brief = parse.select(".clear.margintop25 span").text(); String sponsor = parse.select(".clear tr:nth-child(1) .tabletext2:nth-child(2)").text(); String projectFunding = parse.select(".clear tr:nth-child(12) .tabletext2:nth-child(2)").text(); String relatedProject = parse.select(".clear tr:nth-child(20) .tabletext2:nth-child(2)").text();
String awardInstrument = parse.select(".clear tr:nth-child(6) .tabletext2:nth-child(2)").text(); String programManager = parse.select(".clear tr:nth-child(7) .tabletext2:nth-child(2)").text(); String totalIntendedAwardAmount = parse.select(".clear tr:nth-child(10) .tabletext2:nth-child(2)").text(); String totalAwardedAmountToDate = parse.select(".clear tr:nth-child(11) .tabletext2:nth-child(2)").text(); String recipientSponsoredResearchOffice = parse.select(".clear tr:nth-child(14) .tabletext2:nth-child(2)").text(); String sponsorCongressionalDistrict = parse.select(".clear tr:nth-child(15) .tabletext2:nth-child(2)").text(); String primaryPlaceOfPerformance = parse.select(".clear tr:nth-child(16) .tabletext2:nth-child(2)").text(); String primaryPlaceOfPerformanceCongressionalDistrict = parse.select(".clear tr:nth-child(17) .tabletext2:nth-child(2)").text(); String uniqueEntityIdentifier = parse.select(".clear tr:nth-child(18) .tabletext2:nth-child(2)").text(); String parentUEI = parse.select(".clear tr:nth-child(19) .tabletext2:nth-child(2)").text(); String primaryProgramSource = parse.select(".clear tr:nth-child(21) .tabletext2:nth-child(2)").text(); String programReferenceCode = parse.select(".clear tr:nth-child(22) .tabletext2:nth-child(2)").text(); String programElementCode = parse.select(".clear tr:nth-child(23) .tabletext2:nth-child(2)").text(); String awardAgencyCode = parse.select(".clear tr:nth-child(24) .tabletext2:nth-child(2)").text(); String fundAgencyCode = parse.select(".clear tr:nth-child(25) .tabletext2:nth-child(2)").text(); String assistanceListingNumber = parse.select(".clear tr:nth-child(26) .tabletext2:nth-child(2)").text(); String initialAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(3) .tabletext2:nth-child(2)").text()); String latestAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(4) .tabletext2:nth-child(2)").text());
List<Map<String, Object>> citations = extractAllCitationInfo(html); Map<String,Object> data = new HashMap<>(); data.put("title",title); data.put("projectNum",projectNum); data.put("projectLeader",projectLeader); data.put("projectStartTime",projectStartTime); data.put("projectEndTime",projectEndTime); data.put("sponsorPart",sponsorPart); data.put("country",country); data.put("brief",brief); data.put("sponsor",sponsor); data.put("projectFunding",projectFunding); data.put("relatedProject",relatedProject); data.put("awardInstrument",awardInstrument); data.put("programManager",programManager); data.put("totalIntendedAwardAmount",totalIntendedAwardAmount); data.put("totalAwardedAmountToDate",totalAwardedAmountToDate); data.put("recipientSponsoredResearchOffice",recipientSponsoredResearchOffice); data.put("sponsorCongressionalDistrict",sponsorCongressionalDistrict); data.put("primaryPlaceOfPerformance",primaryPlaceOfPerformance); data.put("primaryPlaceOfPerformanceCongressionalDistrict",primaryPlaceOfPerformanceCongressionalDistrict); data.put("uniqueEntityIdentifier",uniqueEntityIdentifier); data.put("parentUEI",parentUEI); data.put("primaryProgramSource",primaryProgramSource); data.put("programReferenceCode",programReferenceCode); data.put("programElementCode",programElementCode); data.put("awardAgencyCode",awardAgencyCode); data.put("fundAgencyCode",fundAgencyCode); data.put("assistanceListingNumber",assistanceListingNumber); data.put("publications",citations); data.put("initialAmendmentDate",initialAmendmentDate); data.put("latestAmendmentDate",latestAmendmentDate); data.put("crawlUrl",url); data.put("crawlTime",localDateTime()); Map<String,Object> result = new HashMap<>(); result.put("keyword",keyword); result.put("data",data); try { String jsonValue = objectMapper.writeValueAsString(result); ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, projectNum, jsonValue);
producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("成功发送到Kafka - Partition: " + metadata.partition() + ", Offset: " + metadata.offset()); } else { System.err.println("发送到Kafka失败: " + exception.getMessage()); } }); } catch (Exception e) { System.err.println("序列化或发送Kafka消息失败: " + e.getMessage());
}
Thread.sleep(sleepTime); } load = increaseOffsetBy30(load); }
} catch (Exception e) { System.err.println("处理 " + keyword + " 失败: " + e.getMessage()); e.printStackTrace(); } }); } executor.shutdown(); executor.awaitTermination(5, TimeUnit.HOURS); producer.close(); }
public static String convertToTimestamp(String dateStr) { try { // Parse "Jan. 9, 2025" (abbreviated month, dot, comma-separated)
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH); LocalDate date = LocalDate.parse(dateStr, inputFormatter);
// Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return date.atStartOfDay().format(outputFormatter); } catch (Exception e) { e.printStackTrace(); return null; }
} public static String convertToTimestamp2(String dateStr) { try { // 移除 "(Estimated)" 部分
String cleanDateStr = dateStr.replace(" (Estimated)", "").trim();
// Parse "June 30, 2025" (full month, day, comma-separated year)
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH); LocalDate date = LocalDate.parse(cleanDateStr, inputFormatter);
// Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return date.atStartOfDay().format(outputFormatter); } catch (Exception e) { e.printStackTrace(); return null; } } public static List<Map<String, Object>> extractAllCitationInfo(String html) { Document doc = Jsoup.parse(html); List<Map<String, Object>> citations = new ArrayList<>();
// 选择所有 margintop15
Elements marginDivs = doc.select(".margintop15"); Pattern urlPattern = Pattern.compile("javascript:popwin\\('(.*?)'\\)");
for (Element div : marginDivs) { Map<String, Object> info = new HashMap<>();
// 提取 span 中的文本
Elements spans = div.select("> span"); if (spans.size() >= 3) { info.put("authors", spans.get(0).text()); info.put("title", spans.get(1).text()); info.put("year", spans.get(2).text()); }
// 提取链接
Elements links = div.select("a"); String doiUrl = ""; String citationUrl = ""; for (Element link : links) { String href = link.attr("href"); Matcher matcher = urlPattern.matcher(href); if (matcher.find()) { String url = matcher.group(1); if (link.text().contains("doi.org") && doiUrl.isEmpty()) { doiUrl = url; } else if (link.text().contains("引用详细信息") && citationUrl.isEmpty()) { citationUrl = url; } } } info.put("doiUrl", doiUrl); info.put("citationUrl", citationUrl);
// 添加到结果列表
citations.add(info); }
return citations; } public static String localDateTime(){ LocalDateTime dateTime = LocalDateTime.now();
// 创建 DateTimeFormatter,定义日期时间的格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 使用 formatter 格式化 LocalDateTime
String formattedDateTime = dateTime.format(formatter);
return formattedDateTime; // 输出类似: 2025-04-08 13:45:30
} public static String increaseOffsetBy30(String originalPayload) { // 以 "|" 分割载荷为数组
String[] parts = originalPayload.split("\\|");
// 检查数组长度,确保有足够元素
if (parts.length < 4) { throw new IllegalArgumentException("载荷格式无效,元素不足"); }
// 找到倒数第 4 个元素的位置
int targetIndex = parts.length - 4;
try { // 将倒数第 4 个数字解析为整数
int currentOffset = Integer.parseInt(parts[targetIndex]); // 增加 30
int newOffset = currentOffset + 30; // 将新值放回数组
parts[targetIndex] = String.valueOf(newOffset);
// 重新拼接载荷
return String.join("|", parts); } catch (NumberFormatException e) { throw new IllegalArgumentException("倒数第 4 个元素不是有效数字: " + parts[targetIndex]); } } private static Response executeWithRetry(OkHttpClient client, Request request, String keyword) throws IOException { int maxRetries = proxyList.isEmpty() ? 1 : proxyList.size(); // 如果没有代理,只尝试一次
int attempt = 0;
while (attempt < maxRetries) { Response response = client.newCall(request).execute(); if (response.code() == 403) { System.out.println("收到 403 状态码,尝试切换代理重试..."); response.close(); switchProxy(); client = createClientWithProxy(); // 使用新代理重建客户端
attempt++; if (attempt == maxRetries) { throw new IOException("所有代理尝试失败,仍然收到 403"); } continue; } return response; // 成功或非 403 状态码,直接返回
} throw new IOException("无法执行请求,未获取响应"); } private static OkHttpClient createClientWithProxy() { OkHttpClient.Builder builder = new OkHttpClient().newBuilder() .connectTimeout(30, TimeUnit.SECONDS) .readTimeout(30, TimeUnit.SECONDS) .writeTimeout(30, TimeUnit.SECONDS);
if (!proxyList.isEmpty() && currentProxyIndex < proxyList.size()) { String proxy = proxyList.get(currentProxyIndex); String[] proxyParts = proxy.split(":"); if (proxyParts.length == 2) { String proxyHost = proxyParts[0]; int proxyPort = Integer.parseInt(proxyParts[1]); builder.proxy(new java.net.Proxy(java.net.Proxy.Type.HTTP, new java.net.InetSocketAddress(proxyHost, proxyPort))); System.out.println("使用代理: " + proxy); } } return builder.build(); } private static synchronized void switchProxy() { if (proxyList.isEmpty()) return; currentProxyIndex = (currentProxyIndex + 1) % proxyList.size(); System.out.println("切换到新代理: " + proxyList.get(currentProxyIndex)); } }
|