You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

403 lines
23 KiB

1 month ago
  1. package com.example;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import okhttp3.*;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerConfig;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.common.serialization.StringSerializer;
  8. import org.jsoup.Jsoup;
  9. import org.jsoup.nodes.Document;
  10. import org.jsoup.nodes.Element;
  11. import org.jsoup.select.Elements;
  12. import java.io.IOException;
  13. import java.nio.file.Files;
  14. import java.nio.file.Paths;
  15. import java.time.LocalDate;
  16. import java.time.LocalDateTime;
  17. import java.time.format.DateTimeFormatter;
  18. import java.util.*;
  19. import java.util.concurrent.ExecutorService;
  20. import java.util.concurrent.Executors;
  21. import java.util.concurrent.TimeUnit;
  22. import java.util.regex.Matcher;
  23. import java.util.regex.Pattern;
  24. public class projTopic {
  25. private static final String TOPIC_NAME = "projTopic";
  26. private static final String BOOTSTRAP_SERVERS = "node-01:19092";
  27. private static KafkaProducer<String, String> producer;
  28. private static ObjectMapper objectMapper = new ObjectMapper();
  29. private static final Random random = new Random();
  30. private static List<String> proxyList = new ArrayList<>(); // 代理池
  31. private static int currentProxyIndex = 0; // 当前使用的代理索引
  32. static {
  33. Properties props = new Properties();
  34. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  35. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  36. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  37. props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
  38. props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
  39. producer = new KafkaProducer<>(props);
  40. try {
  41. proxyList = Files.readAllLines(Paths.get("proxy.txt"));
  42. if (proxyList.isEmpty()) {
  43. System.out.println("警告: proxy.txt 为空,未加载任何代理");
  44. } else {
  45. System.out.println("成功加载 " + proxyList.size() + " 个代理");
  46. }
  47. } catch (IOException e) {
  48. System.err.println("读取 proxy.txt 失败: " + e.getMessage());
  49. }
  50. }
  51. public static void main(String[] args) throws IOException, InterruptedException {
  52. List<String> keywords = Files.readAllLines(Paths.get("keywords.txt"));
  53. List<String> cleanedKeywords = new ArrayList<>();
  54. for (String keyword : keywords) {
  55. String cleaned = keyword.split(",")[0].trim(); // 取逗号前的部分并去除首尾空格
  56. cleaned = cleaned.replaceAll("\\s+", "+"); // 替换所有空格为 +
  57. cleanedKeywords.add(cleaned);
  58. }
  59. ExecutorService executor = Executors.newFixedThreadPool(4); // 4 个线程
  60. for (String keyword : cleanedKeywords) {
  61. executor.submit(() -> {
  62. try {
  63. int sleepTime = random.nextInt(1001) + 30000;
  64. String load = "5|0|20|https://www.nsf.gov/awardsearch/jsp/gwt/search/|57BE5CA45E781DC0159F727F8A8205EB|gov.nsf.research.awardsearch.gwt.client.SearchAwardService|getAwards|gov.nsf.research.awardsearch.gwt.bean.SearchRequestBean/3930579236|com.extjs.gxt.ui.client.data.PagingLoadConfig|java.util.HashMap/962170901|java.lang.String/2004016611|QueryText|" + keyword + "|ActiveAwards|true|com.extjs.gxt.ui.client.data.BasePagingLoadConfig/2011366567|com.extjs.gxt.ui.client.data.RpcMap/3441186752|sortField|sortDir|com.extjs.gxt.ui.client.Style$SortDir/640452531|offset|java.lang.Integer/3438268394|limit|1|2|3|4|2|5|6|5|7|2|8|9|8|10|8|11|8|12|13|0|1|14|4|15|0|16|17|0|18|19|0|20|19|30|";
  65. for(int i=0;;i++){
  66. OkHttpClient client = createClientWithProxy();
  67. MediaType mediaType = MediaType.parse("text/x-gwt-rpc; charset=UTF-8");
  68. RequestBody body = RequestBody.create(mediaType, load);
  69. Request request = new Request.Builder()
  70. .url("https://www.nsf.gov/awardsearch/jsp/gwt/search/.searchaward")
  71. .method("POST", body)
  72. .addHeader("Content-Type", "text/x-gwt-rpc; charset=UTF-8")
  73. .addHeader("X-GWT-Module-Base", "https://www.nsf.gov/awardsearch/jsp/gwt/search/")
  74. .addHeader("X-GWT-Permutation", "368C3CF86AA4CD7DB2250B35B844C1C2")
  75. // .addHeader("cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
  76. .build();
  77. Response response = executeWithRetry(client, request, keyword);
  78. String content = response.body().string();
  79. Pattern pattern = Pattern.compile("\"awdNumber\",\"(\\d+)\"");
  80. Matcher matcher = pattern.matcher(content);
  81. List<String> numbers = new ArrayList<>(); // 用于存储匹配的数字
  82. // 查找并提取数字
  83. List<String> additionalNumbers = new ArrayList<>();
  84. List<String> urls = new ArrayList<>();
  85. // 查找匹配项
  86. while (matcher.find()) {
  87. // 获取捕获到的数字,并将其添加到列表中
  88. numbers.add(matcher.group(1));
  89. }
  90. // 输出捕获到的数字
  91. if (numbers.isEmpty()) {
  92. System.out.println("没找到awdNumber,继续下一种查找");
  93. } else {
  94. for (String number : numbers) {
  95. additionalNumbers.add(number);
  96. }
  97. }
  98. Pattern additionalPattern = Pattern.compile("\"[^\"]+\",\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\"(?:,\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\")?,\"(\\d+)\"");
  99. Matcher additionalMatcher = additionalPattern.matcher(content);
  100. while (additionalMatcher.find()) {
  101. additionalNumbers.add(additionalMatcher.group(1));
  102. }
  103. if (additionalNumbers.isEmpty()) {
  104. System.out.println("没找到下一页内容链接");
  105. Thread.sleep(sleepTime);
  106. break;
  107. } else {
  108. for (String number : additionalNumbers) {
  109. String url = "https://www.nsf.gov/awardsearch/showAward?AWD_ID=" + number + "&HistoricalAwards=false";
  110. urls.add(url);
  111. }
  112. }
  113. if (!urls.isEmpty() && urls.get(0).equals("https://www.nsf.gov/awardsearch/showAward?AWD_ID=2446604&HistoricalAwards=false")) {
  114. System.out.println("第一个 URL 是 AWD_ID=2446604,跳过关键词: " + keyword);
  115. Thread.sleep(sleepTime);
  116. return; // 跳出当前任务,处理下一个关键词
  117. }
  118. for(String url:urls){
  119. OkHttpClient client2 = createClientWithProxy();
  120. MediaType mediaType2 = MediaType.parse("text/plain");
  121. RequestBody body2 = RequestBody.create(mediaType2, "");
  122. Request request2 = new Request.Builder()
  123. .url(url)
  124. .get()
  125. // .addHeader("Cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
  126. .build();
  127. Response response2 = executeWithRetry(client2, request2, keyword);
  128. System.out.println(response2.code());
  129. String html = response2.body().string();
  130. Document parse = Jsoup.parse(html);
  131. String title = parse.select(".pageheadline").text();
  132. String projectNum = parse.select(".clear tr:nth-child(5) .tabletext2:nth-child(2)").text();
  133. String projectLeader = parse.select(".clear tr:nth-child(13) .tabletext2:nth-child(2)").text();
  134. String projectStartTime = convertToTimestamp(parse.select(".clear tr:nth-child(8) .tabletext2:nth-child(2)").text());
  135. String projectEndTime = convertToTimestamp2(parse.select(".clear tr:nth-child(9) .tabletext2:nth-child(2)").text());
  136. String sponsorPart = parse.select(".clear tr:nth-child(2) .tabletext2:nth-child(2)").text();
  137. String country = "USA";
  138. String brief = parse.select(".clear.margintop25 span").text();
  139. String sponsor = parse.select(".clear tr:nth-child(1) .tabletext2:nth-child(2)").text();
  140. String projectFunding = parse.select(".clear tr:nth-child(12) .tabletext2:nth-child(2)").text();
  141. String relatedProject = parse.select(".clear tr:nth-child(20) .tabletext2:nth-child(2)").text();
  142. String awardInstrument = parse.select(".clear tr:nth-child(6) .tabletext2:nth-child(2)").text();
  143. String programManager = parse.select(".clear tr:nth-child(7) .tabletext2:nth-child(2)").text();
  144. String totalIntendedAwardAmount = parse.select(".clear tr:nth-child(10) .tabletext2:nth-child(2)").text();
  145. String totalAwardedAmountToDate = parse.select(".clear tr:nth-child(11) .tabletext2:nth-child(2)").text();
  146. String recipientSponsoredResearchOffice = parse.select(".clear tr:nth-child(14) .tabletext2:nth-child(2)").text();
  147. String sponsorCongressionalDistrict = parse.select(".clear tr:nth-child(15) .tabletext2:nth-child(2)").text();
  148. String primaryPlaceOfPerformance = parse.select(".clear tr:nth-child(16) .tabletext2:nth-child(2)").text();
  149. String primaryPlaceOfPerformanceCongressionalDistrict = parse.select(".clear tr:nth-child(17) .tabletext2:nth-child(2)").text();
  150. String uniqueEntityIdentifier = parse.select(".clear tr:nth-child(18) .tabletext2:nth-child(2)").text();
  151. String parentUEI = parse.select(".clear tr:nth-child(19) .tabletext2:nth-child(2)").text();
  152. String primaryProgramSource = parse.select(".clear tr:nth-child(21) .tabletext2:nth-child(2)").text();
  153. String programReferenceCode = parse.select(".clear tr:nth-child(22) .tabletext2:nth-child(2)").text();
  154. String programElementCode = parse.select(".clear tr:nth-child(23) .tabletext2:nth-child(2)").text();
  155. String awardAgencyCode = parse.select(".clear tr:nth-child(24) .tabletext2:nth-child(2)").text();
  156. String fundAgencyCode = parse.select(".clear tr:nth-child(25) .tabletext2:nth-child(2)").text();
  157. String assistanceListingNumber = parse.select(".clear tr:nth-child(26) .tabletext2:nth-child(2)").text();
  158. String initialAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(3) .tabletext2:nth-child(2)").text());
  159. String latestAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(4) .tabletext2:nth-child(2)").text());
  160. List<Map<String, Object>> citations = extractAllCitationInfo(html);
  161. Map<String,Object> data = new HashMap<>();
  162. data.put("title",title);
  163. data.put("projectNum",projectNum);
  164. data.put("projectLeader",projectLeader);
  165. data.put("projectStartTime",projectStartTime);
  166. data.put("projectEndTime",projectEndTime);
  167. data.put("sponsorPart",sponsorPart);
  168. data.put("country",country);
  169. data.put("brief",brief);
  170. data.put("sponsor",sponsor);
  171. data.put("projectFunding",projectFunding);
  172. data.put("relatedProject",relatedProject);
  173. data.put("awardInstrument",awardInstrument);
  174. data.put("programManager",programManager);
  175. data.put("totalIntendedAwardAmount",totalIntendedAwardAmount);
  176. data.put("totalAwardedAmountToDate",totalAwardedAmountToDate);
  177. data.put("recipientSponsoredResearchOffice",recipientSponsoredResearchOffice);
  178. data.put("sponsorCongressionalDistrict",sponsorCongressionalDistrict);
  179. data.put("primaryPlaceOfPerformance",primaryPlaceOfPerformance);
  180. data.put("primaryPlaceOfPerformanceCongressionalDistrict",primaryPlaceOfPerformanceCongressionalDistrict);
  181. data.put("uniqueEntityIdentifier",uniqueEntityIdentifier);
  182. data.put("parentUEI",parentUEI);
  183. data.put("primaryProgramSource",primaryProgramSource);
  184. data.put("programReferenceCode",programReferenceCode);
  185. data.put("programElementCode",programElementCode);
  186. data.put("awardAgencyCode",awardAgencyCode);
  187. data.put("fundAgencyCode",fundAgencyCode);
  188. data.put("assistanceListingNumber",assistanceListingNumber);
  189. data.put("publications",citations);
  190. data.put("initialAmendmentDate",initialAmendmentDate);
  191. data.put("latestAmendmentDate",latestAmendmentDate);
  192. data.put("crawlUrl",url);
  193. data.put("crawlTime",localDateTime());
  194. Map<String,Object> result = new HashMap<>();
  195. result.put("keyword",keyword);
  196. result.put("data",data);
  197. try {
  198. String jsonValue = objectMapper.writeValueAsString(result);
  199. ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, projectNum, jsonValue);
  200. producer.send(record, (metadata, exception) -> {
  201. if (exception == null) {
  202. System.out.println("成功发送到Kafka - Partition: " + metadata.partition() +
  203. ", Offset: " + metadata.offset());
  204. } else {
  205. System.err.println("发送到Kafka失败: " + exception.getMessage());
  206. }
  207. });
  208. } catch (Exception e) {
  209. System.err.println("序列化或发送Kafka消息失败: " + e.getMessage());
  210. }
  211. Thread.sleep(sleepTime);
  212. }
  213. load = increaseOffsetBy30(load);
  214. }
  215. } catch (Exception e) {
  216. System.err.println("处理 " + keyword + " 失败: " + e.getMessage());
  217. e.printStackTrace();
  218. }
  219. });
  220. }
  221. executor.shutdown();
  222. executor.awaitTermination(5, TimeUnit.HOURS);
  223. producer.close();
  224. }
  225. public static String convertToTimestamp(String dateStr) {
  226. try {
  227. // Parse "Jan. 9, 2025" (abbreviated month, dot, comma-separated)
  228. DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH);
  229. LocalDate date = LocalDate.parse(dateStr, inputFormatter);
  230. // Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
  231. DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  232. return date.atStartOfDay().format(outputFormatter);
  233. } catch (Exception e) {
  234. e.printStackTrace();
  235. return null;
  236. }
  237. }
  238. public static String convertToTimestamp2(String dateStr) {
  239. try {
  240. // 移除 "(Estimated)" 部分
  241. String cleanDateStr = dateStr.replace(" (Estimated)", "").trim();
  242. // Parse "June 30, 2025" (full month, day, comma-separated year)
  243. DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH);
  244. LocalDate date = LocalDate.parse(cleanDateStr, inputFormatter);
  245. // Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
  246. DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  247. return date.atStartOfDay().format(outputFormatter);
  248. } catch (Exception e) {
  249. e.printStackTrace();
  250. return null;
  251. }
  252. }
  253. public static List<Map<String, Object>> extractAllCitationInfo(String html) {
  254. Document doc = Jsoup.parse(html);
  255. List<Map<String, Object>> citations = new ArrayList<>();
  256. // 选择所有 margintop15
  257. Elements marginDivs = doc.select(".margintop15");
  258. Pattern urlPattern = Pattern.compile("javascript:popwin\\('(.*?)'\\)");
  259. for (Element div : marginDivs) {
  260. Map<String, Object> info = new HashMap<>();
  261. // 提取 span 中的文本
  262. Elements spans = div.select("> span");
  263. if (spans.size() >= 3) {
  264. info.put("authors", spans.get(0).text());
  265. info.put("title", spans.get(1).text());
  266. info.put("year", spans.get(2).text());
  267. }
  268. // 提取链接
  269. Elements links = div.select("a");
  270. String doiUrl = "";
  271. String citationUrl = "";
  272. for (Element link : links) {
  273. String href = link.attr("href");
  274. Matcher matcher = urlPattern.matcher(href);
  275. if (matcher.find()) {
  276. String url = matcher.group(1);
  277. if (link.text().contains("doi.org") && doiUrl.isEmpty()) {
  278. doiUrl = url;
  279. } else if (link.text().contains("引用详细信息") && citationUrl.isEmpty()) {
  280. citationUrl = url;
  281. }
  282. }
  283. }
  284. info.put("doiUrl", doiUrl);
  285. info.put("citationUrl", citationUrl);
  286. // 添加到结果列表
  287. citations.add(info);
  288. }
  289. return citations;
  290. }
  291. public static String localDateTime(){
  292. LocalDateTime dateTime = LocalDateTime.now();
  293. // 创建 DateTimeFormatter,定义日期时间的格式
  294. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  295. // 使用 formatter 格式化 LocalDateTime
  296. String formattedDateTime = dateTime.format(formatter);
  297. return formattedDateTime; // 输出类似: 2025-04-08 13:45:30
  298. }
  299. public static String increaseOffsetBy30(String originalPayload) {
  300. // 以 "|" 分割载荷为数组
  301. String[] parts = originalPayload.split("\\|");
  302. // 检查数组长度,确保有足够元素
  303. if (parts.length < 4) {
  304. throw new IllegalArgumentException("载荷格式无效,元素不足");
  305. }
  306. // 找到倒数第 4 个元素的位置
  307. int targetIndex = parts.length - 4;
  308. try {
  309. // 将倒数第 4 个数字解析为整数
  310. int currentOffset = Integer.parseInt(parts[targetIndex]);
  311. // 增加 30
  312. int newOffset = currentOffset + 30;
  313. // 将新值放回数组
  314. parts[targetIndex] = String.valueOf(newOffset);
  315. // 重新拼接载荷
  316. return String.join("|", parts);
  317. } catch (NumberFormatException e) {
  318. throw new IllegalArgumentException("倒数第 4 个元素不是有效数字: " + parts[targetIndex]);
  319. }
  320. }
  321. private static Response executeWithRetry(OkHttpClient client, Request request, String keyword) throws IOException {
  322. int maxRetries = proxyList.isEmpty() ? 1 : proxyList.size(); // 如果没有代理,只尝试一次
  323. int attempt = 0;
  324. while (attempt < maxRetries) {
  325. Response response = client.newCall(request).execute();
  326. if (response.code() == 403) {
  327. System.out.println("收到 403 状态码,尝试切换代理重试...");
  328. response.close();
  329. switchProxy();
  330. client = createClientWithProxy(); // 使用新代理重建客户端
  331. attempt++;
  332. if (attempt == maxRetries) {
  333. throw new IOException("所有代理尝试失败,仍然收到 403");
  334. }
  335. continue;
  336. }
  337. return response; // 成功或非 403 状态码,直接返回
  338. }
  339. throw new IOException("无法执行请求,未获取响应");
  340. }
  341. private static OkHttpClient createClientWithProxy() {
  342. OkHttpClient.Builder builder = new OkHttpClient().newBuilder()
  343. .connectTimeout(30, TimeUnit.SECONDS)
  344. .readTimeout(30, TimeUnit.SECONDS)
  345. .writeTimeout(30, TimeUnit.SECONDS);
  346. if (!proxyList.isEmpty() && currentProxyIndex < proxyList.size()) {
  347. String proxy = proxyList.get(currentProxyIndex);
  348. String[] proxyParts = proxy.split(":");
  349. if (proxyParts.length == 2) {
  350. String proxyHost = proxyParts[0];
  351. int proxyPort = Integer.parseInt(proxyParts[1]);
  352. builder.proxy(new java.net.Proxy(java.net.Proxy.Type.HTTP,
  353. new java.net.InetSocketAddress(proxyHost, proxyPort)));
  354. System.out.println("使用代理: " + proxy);
  355. }
  356. }
  357. return builder.build();
  358. }
  359. private static synchronized void switchProxy() {
  360. if (proxyList.isEmpty()) return;
  361. currentProxyIndex = (currentProxyIndex + 1) % proxyList.size();
  362. System.out.println("切换到新代理: " + proxyList.get(currentProxyIndex));
  363. }
  364. }