package com.example; import com.fasterxml.jackson.databind.ObjectMapper; import okhttp3.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; public class projTopic { private static final String TOPIC_NAME = "projTopic"; private static final String BOOTSTRAP_SERVERS = "node-01:19092"; private static KafkaProducer producer; private static ObjectMapper objectMapper = new ObjectMapper(); private static final Random random = new Random(); private static List proxyList = new ArrayList<>(); // 代理池 private static int currentProxyIndex = 0; // 当前使用的代理索引 static { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数 producer = new KafkaProducer<>(props); try { proxyList = Files.readAllLines(Paths.get("proxy.txt")); if (proxyList.isEmpty()) { System.out.println("警告: proxy.txt 为空,未加载任何代理"); } else { System.out.println("成功加载 " + proxyList.size() + " 个代理"); } } catch (IOException e) { System.err.println("读取 proxy.txt 失败: " + e.getMessage()); } } public static void main(String[] args) throws IOException, InterruptedException { List keywords = Files.readAllLines(Paths.get("keywords.txt")); List cleanedKeywords = new ArrayList<>(); for (String keyword : keywords) { String cleaned = keyword.split(",")[0].trim(); // 取逗号前的部分并去除首尾空格 cleaned = cleaned.replaceAll("\\s+", "+"); // 替换所有空格为 + cleanedKeywords.add(cleaned); } ExecutorService executor = Executors.newFixedThreadPool(4); // 4 个线程 for (String keyword : cleanedKeywords) { executor.submit(() -> { try { int sleepTime = random.nextInt(1001) + 30000; String load = "5|0|20|https://www.nsf.gov/awardsearch/jsp/gwt/search/|57BE5CA45E781DC0159F727F8A8205EB|gov.nsf.research.awardsearch.gwt.client.SearchAwardService|getAwards|gov.nsf.research.awardsearch.gwt.bean.SearchRequestBean/3930579236|com.extjs.gxt.ui.client.data.PagingLoadConfig|java.util.HashMap/962170901|java.lang.String/2004016611|QueryText|" + keyword + "|ActiveAwards|true|com.extjs.gxt.ui.client.data.BasePagingLoadConfig/2011366567|com.extjs.gxt.ui.client.data.RpcMap/3441186752|sortField|sortDir|com.extjs.gxt.ui.client.Style$SortDir/640452531|offset|java.lang.Integer/3438268394|limit|1|2|3|4|2|5|6|5|7|2|8|9|8|10|8|11|8|12|13|0|1|14|4|15|0|16|17|0|18|19|0|20|19|30|"; for(int i=0;;i++){ OkHttpClient client = createClientWithProxy(); MediaType mediaType = MediaType.parse("text/x-gwt-rpc; charset=UTF-8"); RequestBody body = RequestBody.create(mediaType, load); Request request = new Request.Builder() .url("https://www.nsf.gov/awardsearch/jsp/gwt/search/.searchaward") .method("POST", body) .addHeader("Content-Type", "text/x-gwt-rpc; charset=UTF-8") .addHeader("X-GWT-Module-Base", "https://www.nsf.gov/awardsearch/jsp/gwt/search/") .addHeader("X-GWT-Permutation", "368C3CF86AA4CD7DB2250B35B844C1C2") // .addHeader("cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB") .build(); Response response = executeWithRetry(client, request, keyword); String content = response.body().string(); Pattern pattern = Pattern.compile("\"awdNumber\",\"(\\d+)\""); Matcher matcher = pattern.matcher(content); List numbers = new ArrayList<>(); // 用于存储匹配的数字 // 查找并提取数字 List additionalNumbers = new ArrayList<>(); List urls = new ArrayList<>(); // 查找匹配项 while (matcher.find()) { // 获取捕获到的数字,并将其添加到列表中 numbers.add(matcher.group(1)); } // 输出捕获到的数字 if (numbers.isEmpty()) { System.out.println("没找到awdNumber,继续下一种查找"); } else { for (String number : numbers) { additionalNumbers.add(number); } } Pattern additionalPattern = Pattern.compile("\"[^\"]+\",\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\"(?:,\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\")?,\"(\\d+)\""); Matcher additionalMatcher = additionalPattern.matcher(content); while (additionalMatcher.find()) { additionalNumbers.add(additionalMatcher.group(1)); } if (additionalNumbers.isEmpty()) { System.out.println("没找到下一页内容链接"); Thread.sleep(sleepTime); break; } else { for (String number : additionalNumbers) { String url = "https://www.nsf.gov/awardsearch/showAward?AWD_ID=" + number + "&HistoricalAwards=false"; urls.add(url); } } if (!urls.isEmpty() && urls.get(0).equals("https://www.nsf.gov/awardsearch/showAward?AWD_ID=2446604&HistoricalAwards=false")) { System.out.println("第一个 URL 是 AWD_ID=2446604,跳过关键词: " + keyword); Thread.sleep(sleepTime); return; // 跳出当前任务,处理下一个关键词 } for(String url:urls){ OkHttpClient client2 = createClientWithProxy(); MediaType mediaType2 = MediaType.parse("text/plain"); RequestBody body2 = RequestBody.create(mediaType2, ""); Request request2 = new Request.Builder() .url(url) .get() // .addHeader("Cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB") .build(); Response response2 = executeWithRetry(client2, request2, keyword); System.out.println(response2.code()); String html = response2.body().string(); Document parse = Jsoup.parse(html); String title = parse.select(".pageheadline").text(); String projectNum = parse.select(".clear tr:nth-child(5) .tabletext2:nth-child(2)").text(); String projectLeader = parse.select(".clear tr:nth-child(13) .tabletext2:nth-child(2)").text(); String projectStartTime = convertToTimestamp(parse.select(".clear tr:nth-child(8) .tabletext2:nth-child(2)").text()); String projectEndTime = convertToTimestamp2(parse.select(".clear tr:nth-child(9) .tabletext2:nth-child(2)").text()); String sponsorPart = parse.select(".clear tr:nth-child(2) .tabletext2:nth-child(2)").text(); String country = "USA"; String brief = parse.select(".clear.margintop25 span").text(); String sponsor = parse.select(".clear tr:nth-child(1) .tabletext2:nth-child(2)").text(); String projectFunding = parse.select(".clear tr:nth-child(12) .tabletext2:nth-child(2)").text(); String relatedProject = parse.select(".clear tr:nth-child(20) .tabletext2:nth-child(2)").text(); String awardInstrument = parse.select(".clear tr:nth-child(6) .tabletext2:nth-child(2)").text(); String programManager = parse.select(".clear tr:nth-child(7) .tabletext2:nth-child(2)").text(); String totalIntendedAwardAmount = parse.select(".clear tr:nth-child(10) .tabletext2:nth-child(2)").text(); String totalAwardedAmountToDate = parse.select(".clear tr:nth-child(11) .tabletext2:nth-child(2)").text(); String recipientSponsoredResearchOffice = parse.select(".clear tr:nth-child(14) .tabletext2:nth-child(2)").text(); String sponsorCongressionalDistrict = parse.select(".clear tr:nth-child(15) .tabletext2:nth-child(2)").text(); String primaryPlaceOfPerformance = parse.select(".clear tr:nth-child(16) .tabletext2:nth-child(2)").text(); String primaryPlaceOfPerformanceCongressionalDistrict = parse.select(".clear tr:nth-child(17) .tabletext2:nth-child(2)").text(); String uniqueEntityIdentifier = parse.select(".clear tr:nth-child(18) .tabletext2:nth-child(2)").text(); String parentUEI = parse.select(".clear tr:nth-child(19) .tabletext2:nth-child(2)").text(); String primaryProgramSource = parse.select(".clear tr:nth-child(21) .tabletext2:nth-child(2)").text(); String programReferenceCode = parse.select(".clear tr:nth-child(22) .tabletext2:nth-child(2)").text(); String programElementCode = parse.select(".clear tr:nth-child(23) .tabletext2:nth-child(2)").text(); String awardAgencyCode = parse.select(".clear tr:nth-child(24) .tabletext2:nth-child(2)").text(); String fundAgencyCode = parse.select(".clear tr:nth-child(25) .tabletext2:nth-child(2)").text(); String assistanceListingNumber = parse.select(".clear tr:nth-child(26) .tabletext2:nth-child(2)").text(); String initialAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(3) .tabletext2:nth-child(2)").text()); String latestAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(4) .tabletext2:nth-child(2)").text()); List> citations = extractAllCitationInfo(html); Map data = new HashMap<>(); data.put("title",title); data.put("projectNum",projectNum); data.put("projectLeader",projectLeader); data.put("projectStartTime",projectStartTime); data.put("projectEndTime",projectEndTime); data.put("sponsorPart",sponsorPart); data.put("country",country); data.put("brief",brief); data.put("sponsor",sponsor); data.put("projectFunding",projectFunding); data.put("relatedProject",relatedProject); data.put("awardInstrument",awardInstrument); data.put("programManager",programManager); data.put("totalIntendedAwardAmount",totalIntendedAwardAmount); data.put("totalAwardedAmountToDate",totalAwardedAmountToDate); data.put("recipientSponsoredResearchOffice",recipientSponsoredResearchOffice); data.put("sponsorCongressionalDistrict",sponsorCongressionalDistrict); data.put("primaryPlaceOfPerformance",primaryPlaceOfPerformance); data.put("primaryPlaceOfPerformanceCongressionalDistrict",primaryPlaceOfPerformanceCongressionalDistrict); data.put("uniqueEntityIdentifier",uniqueEntityIdentifier); data.put("parentUEI",parentUEI); data.put("primaryProgramSource",primaryProgramSource); data.put("programReferenceCode",programReferenceCode); data.put("programElementCode",programElementCode); data.put("awardAgencyCode",awardAgencyCode); data.put("fundAgencyCode",fundAgencyCode); data.put("assistanceListingNumber",assistanceListingNumber); data.put("publications",citations); data.put("initialAmendmentDate",initialAmendmentDate); data.put("latestAmendmentDate",latestAmendmentDate); data.put("crawlUrl",url); data.put("crawlTime",localDateTime()); Map result = new HashMap<>(); result.put("keyword",keyword); result.put("data",data); try { String jsonValue = objectMapper.writeValueAsString(result); ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, projectNum, jsonValue); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("成功发送到Kafka - Partition: " + metadata.partition() + ", Offset: " + metadata.offset()); } else { System.err.println("发送到Kafka失败: " + exception.getMessage()); } }); } catch (Exception e) { System.err.println("序列化或发送Kafka消息失败: " + e.getMessage()); } Thread.sleep(sleepTime); } load = increaseOffsetBy30(load); } } catch (Exception e) { System.err.println("处理 " + keyword + " 失败: " + e.getMessage()); e.printStackTrace(); } }); } executor.shutdown(); executor.awaitTermination(5, TimeUnit.HOURS); producer.close(); } public static String convertToTimestamp(String dateStr) { try { // Parse "Jan. 9, 2025" (abbreviated month, dot, comma-separated) DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH); LocalDate date = LocalDate.parse(dateStr, inputFormatter); // Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00) DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return date.atStartOfDay().format(outputFormatter); } catch (Exception e) { e.printStackTrace(); return null; } } public static String convertToTimestamp2(String dateStr) { try { // 移除 "(Estimated)" 部分 String cleanDateStr = dateStr.replace(" (Estimated)", "").trim(); // Parse "June 30, 2025" (full month, day, comma-separated year) DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH); LocalDate date = LocalDate.parse(cleanDateStr, inputFormatter); // Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00) DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return date.atStartOfDay().format(outputFormatter); } catch (Exception e) { e.printStackTrace(); return null; } } public static List> extractAllCitationInfo(String html) { Document doc = Jsoup.parse(html); List> citations = new ArrayList<>(); // 选择所有 margintop15 Elements marginDivs = doc.select(".margintop15"); Pattern urlPattern = Pattern.compile("javascript:popwin\\('(.*?)'\\)"); for (Element div : marginDivs) { Map info = new HashMap<>(); // 提取 span 中的文本 Elements spans = div.select("> span"); if (spans.size() >= 3) { info.put("authors", spans.get(0).text()); info.put("title", spans.get(1).text()); info.put("year", spans.get(2).text()); } // 提取链接 Elements links = div.select("a"); String doiUrl = ""; String citationUrl = ""; for (Element link : links) { String href = link.attr("href"); Matcher matcher = urlPattern.matcher(href); if (matcher.find()) { String url = matcher.group(1); if (link.text().contains("doi.org") && doiUrl.isEmpty()) { doiUrl = url; } else if (link.text().contains("引用详细信息") && citationUrl.isEmpty()) { citationUrl = url; } } } info.put("doiUrl", doiUrl); info.put("citationUrl", citationUrl); // 添加到结果列表 citations.add(info); } return citations; } public static String localDateTime(){ LocalDateTime dateTime = LocalDateTime.now(); // 创建 DateTimeFormatter,定义日期时间的格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 使用 formatter 格式化 LocalDateTime String formattedDateTime = dateTime.format(formatter); return formattedDateTime; // 输出类似: 2025-04-08 13:45:30 } public static String increaseOffsetBy30(String originalPayload) { // 以 "|" 分割载荷为数组 String[] parts = originalPayload.split("\\|"); // 检查数组长度,确保有足够元素 if (parts.length < 4) { throw new IllegalArgumentException("载荷格式无效,元素不足"); } // 找到倒数第 4 个元素的位置 int targetIndex = parts.length - 4; try { // 将倒数第 4 个数字解析为整数 int currentOffset = Integer.parseInt(parts[targetIndex]); // 增加 30 int newOffset = currentOffset + 30; // 将新值放回数组 parts[targetIndex] = String.valueOf(newOffset); // 重新拼接载荷 return String.join("|", parts); } catch (NumberFormatException e) { throw new IllegalArgumentException("倒数第 4 个元素不是有效数字: " + parts[targetIndex]); } } private static Response executeWithRetry(OkHttpClient client, Request request, String keyword) throws IOException { int maxRetries = proxyList.isEmpty() ? 1 : proxyList.size(); // 如果没有代理,只尝试一次 int attempt = 0; while (attempt < maxRetries) { Response response = client.newCall(request).execute(); if (response.code() == 403) { System.out.println("收到 403 状态码,尝试切换代理重试..."); response.close(); switchProxy(); client = createClientWithProxy(); // 使用新代理重建客户端 attempt++; if (attempt == maxRetries) { throw new IOException("所有代理尝试失败,仍然收到 403"); } continue; } return response; // 成功或非 403 状态码,直接返回 } throw new IOException("无法执行请求,未获取响应"); } private static OkHttpClient createClientWithProxy() { OkHttpClient.Builder builder = new OkHttpClient().newBuilder() .connectTimeout(30, TimeUnit.SECONDS) .readTimeout(30, TimeUnit.SECONDS) .writeTimeout(30, TimeUnit.SECONDS); if (!proxyList.isEmpty() && currentProxyIndex < proxyList.size()) { String proxy = proxyList.get(currentProxyIndex); String[] proxyParts = proxy.split(":"); if (proxyParts.length == 2) { String proxyHost = proxyParts[0]; int proxyPort = Integer.parseInt(proxyParts[1]); builder.proxy(new java.net.Proxy(java.net.Proxy.Type.HTTP, new java.net.InetSocketAddress(proxyHost, proxyPort))); System.out.println("使用代理: " + proxy); } } return builder.build(); } private static synchronized void switchProxy() { if (proxyList.isEmpty()) return; currentProxyIndex = (currentProxyIndex + 1) % proxyList.size(); System.out.println("切换到新代理: " + proxyList.get(currentProxyIndex)); } }