You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
403 lines
23 KiB
403 lines
23 KiB
package com.example;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import okhttp3.*;
|
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
import org.apache.kafka.common.serialization.StringSerializer;
|
|
import org.jsoup.Jsoup;
|
|
import org.jsoup.nodes.Document;
|
|
import org.jsoup.nodes.Element;
|
|
import org.jsoup.select.Elements;
|
|
|
|
import java.io.IOException;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Paths;
|
|
import java.time.LocalDate;
|
|
import java.time.LocalDateTime;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.util.*;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.regex.Matcher;
|
|
import java.util.regex.Pattern;
|
|
|
|
public class projTopic {
|
|
private static final String TOPIC_NAME = "projTopic";
|
|
private static final String BOOTSTRAP_SERVERS = "node-01:19092";
|
|
private static KafkaProducer<String, String> producer;
|
|
private static ObjectMapper objectMapper = new ObjectMapper();
|
|
private static final Random random = new Random();
|
|
private static List<String> proxyList = new ArrayList<>(); // 代理池
|
|
private static int currentProxyIndex = 0; // 当前使用的代理索引
|
|
static {
|
|
Properties props = new Properties();
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
|
|
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
|
|
producer = new KafkaProducer<>(props);
|
|
try {
|
|
proxyList = Files.readAllLines(Paths.get("proxy.txt"));
|
|
if (proxyList.isEmpty()) {
|
|
System.out.println("警告: proxy.txt 为空,未加载任何代理");
|
|
} else {
|
|
System.out.println("成功加载 " + proxyList.size() + " 个代理");
|
|
}
|
|
} catch (IOException e) {
|
|
System.err.println("读取 proxy.txt 失败: " + e.getMessage());
|
|
}
|
|
}
|
|
|
|
public static void main(String[] args) throws IOException, InterruptedException {
|
|
List<String> keywords = Files.readAllLines(Paths.get("keywords.txt"));
|
|
List<String> cleanedKeywords = new ArrayList<>();
|
|
for (String keyword : keywords) {
|
|
String cleaned = keyword.split(",")[0].trim(); // 取逗号前的部分并去除首尾空格
|
|
cleaned = cleaned.replaceAll("\\s+", "+"); // 替换所有空格为 +
|
|
cleanedKeywords.add(cleaned);
|
|
}
|
|
ExecutorService executor = Executors.newFixedThreadPool(4); // 4 个线程
|
|
for (String keyword : cleanedKeywords) {
|
|
executor.submit(() -> {
|
|
try {
|
|
int sleepTime = random.nextInt(1001) + 30000;
|
|
String load = "5|0|20|https://www.nsf.gov/awardsearch/jsp/gwt/search/|57BE5CA45E781DC0159F727F8A8205EB|gov.nsf.research.awardsearch.gwt.client.SearchAwardService|getAwards|gov.nsf.research.awardsearch.gwt.bean.SearchRequestBean/3930579236|com.extjs.gxt.ui.client.data.PagingLoadConfig|java.util.HashMap/962170901|java.lang.String/2004016611|QueryText|" + keyword + "|ActiveAwards|true|com.extjs.gxt.ui.client.data.BasePagingLoadConfig/2011366567|com.extjs.gxt.ui.client.data.RpcMap/3441186752|sortField|sortDir|com.extjs.gxt.ui.client.Style$SortDir/640452531|offset|java.lang.Integer/3438268394|limit|1|2|3|4|2|5|6|5|7|2|8|9|8|10|8|11|8|12|13|0|1|14|4|15|0|16|17|0|18|19|0|20|19|30|";
|
|
for(int i=0;;i++){
|
|
OkHttpClient client = createClientWithProxy();
|
|
MediaType mediaType = MediaType.parse("text/x-gwt-rpc; charset=UTF-8");
|
|
RequestBody body = RequestBody.create(mediaType, load);
|
|
|
|
Request request = new Request.Builder()
|
|
.url("https://www.nsf.gov/awardsearch/jsp/gwt/search/.searchaward")
|
|
.method("POST", body)
|
|
.addHeader("Content-Type", "text/x-gwt-rpc; charset=UTF-8")
|
|
.addHeader("X-GWT-Module-Base", "https://www.nsf.gov/awardsearch/jsp/gwt/search/")
|
|
.addHeader("X-GWT-Permutation", "368C3CF86AA4CD7DB2250B35B844C1C2")
|
|
// .addHeader("cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
|
|
.build();
|
|
Response response = executeWithRetry(client, request, keyword);
|
|
String content = response.body().string();
|
|
|
|
Pattern pattern = Pattern.compile("\"awdNumber\",\"(\\d+)\"");
|
|
Matcher matcher = pattern.matcher(content);
|
|
|
|
List<String> numbers = new ArrayList<>(); // 用于存储匹配的数字
|
|
// 查找并提取数字
|
|
List<String> additionalNumbers = new ArrayList<>();
|
|
List<String> urls = new ArrayList<>();
|
|
// 查找匹配项
|
|
while (matcher.find()) {
|
|
// 获取捕获到的数字,并将其添加到列表中
|
|
numbers.add(matcher.group(1));
|
|
}
|
|
|
|
// 输出捕获到的数字
|
|
if (numbers.isEmpty()) {
|
|
System.out.println("没找到awdNumber,继续下一种查找");
|
|
|
|
} else {
|
|
for (String number : numbers) {
|
|
additionalNumbers.add(number);
|
|
}
|
|
}
|
|
|
|
Pattern additionalPattern = Pattern.compile("\"[^\"]+\",\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\"(?:,\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\")?,\"(\\d+)\"");
|
|
Matcher additionalMatcher = additionalPattern.matcher(content);
|
|
|
|
|
|
while (additionalMatcher.find()) {
|
|
additionalNumbers.add(additionalMatcher.group(1));
|
|
}
|
|
if (additionalNumbers.isEmpty()) {
|
|
System.out.println("没找到下一页内容链接");
|
|
Thread.sleep(sleepTime);
|
|
break;
|
|
} else {
|
|
for (String number : additionalNumbers) {
|
|
String url = "https://www.nsf.gov/awardsearch/showAward?AWD_ID=" + number + "&HistoricalAwards=false";
|
|
urls.add(url);
|
|
}
|
|
}
|
|
if (!urls.isEmpty() && urls.get(0).equals("https://www.nsf.gov/awardsearch/showAward?AWD_ID=2446604&HistoricalAwards=false")) {
|
|
System.out.println("第一个 URL 是 AWD_ID=2446604,跳过关键词: " + keyword);
|
|
Thread.sleep(sleepTime);
|
|
return; // 跳出当前任务,处理下一个关键词
|
|
}
|
|
for(String url:urls){
|
|
OkHttpClient client2 = createClientWithProxy();
|
|
MediaType mediaType2 = MediaType.parse("text/plain");
|
|
RequestBody body2 = RequestBody.create(mediaType2, "");
|
|
Request request2 = new Request.Builder()
|
|
.url(url)
|
|
.get()
|
|
// .addHeader("Cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
|
|
.build();
|
|
Response response2 = executeWithRetry(client2, request2, keyword);
|
|
System.out.println(response2.code());
|
|
String html = response2.body().string();
|
|
Document parse = Jsoup.parse(html);
|
|
String title = parse.select(".pageheadline").text();
|
|
String projectNum = parse.select(".clear tr:nth-child(5) .tabletext2:nth-child(2)").text();
|
|
String projectLeader = parse.select(".clear tr:nth-child(13) .tabletext2:nth-child(2)").text();
|
|
String projectStartTime = convertToTimestamp(parse.select(".clear tr:nth-child(8) .tabletext2:nth-child(2)").text());
|
|
String projectEndTime = convertToTimestamp2(parse.select(".clear tr:nth-child(9) .tabletext2:nth-child(2)").text());
|
|
String sponsorPart = parse.select(".clear tr:nth-child(2) .tabletext2:nth-child(2)").text();
|
|
String country = "USA";
|
|
String brief = parse.select(".clear.margintop25 span").text();
|
|
String sponsor = parse.select(".clear tr:nth-child(1) .tabletext2:nth-child(2)").text();
|
|
String projectFunding = parse.select(".clear tr:nth-child(12) .tabletext2:nth-child(2)").text();
|
|
String relatedProject = parse.select(".clear tr:nth-child(20) .tabletext2:nth-child(2)").text();
|
|
|
|
|
|
|
|
String awardInstrument = parse.select(".clear tr:nth-child(6) .tabletext2:nth-child(2)").text();
|
|
String programManager = parse.select(".clear tr:nth-child(7) .tabletext2:nth-child(2)").text();
|
|
String totalIntendedAwardAmount = parse.select(".clear tr:nth-child(10) .tabletext2:nth-child(2)").text();
|
|
String totalAwardedAmountToDate = parse.select(".clear tr:nth-child(11) .tabletext2:nth-child(2)").text();
|
|
String recipientSponsoredResearchOffice = parse.select(".clear tr:nth-child(14) .tabletext2:nth-child(2)").text();
|
|
String sponsorCongressionalDistrict = parse.select(".clear tr:nth-child(15) .tabletext2:nth-child(2)").text();
|
|
String primaryPlaceOfPerformance = parse.select(".clear tr:nth-child(16) .tabletext2:nth-child(2)").text();
|
|
String primaryPlaceOfPerformanceCongressionalDistrict = parse.select(".clear tr:nth-child(17) .tabletext2:nth-child(2)").text();
|
|
String uniqueEntityIdentifier = parse.select(".clear tr:nth-child(18) .tabletext2:nth-child(2)").text();
|
|
String parentUEI = parse.select(".clear tr:nth-child(19) .tabletext2:nth-child(2)").text();
|
|
String primaryProgramSource = parse.select(".clear tr:nth-child(21) .tabletext2:nth-child(2)").text();
|
|
String programReferenceCode = parse.select(".clear tr:nth-child(22) .tabletext2:nth-child(2)").text();
|
|
String programElementCode = parse.select(".clear tr:nth-child(23) .tabletext2:nth-child(2)").text();
|
|
String awardAgencyCode = parse.select(".clear tr:nth-child(24) .tabletext2:nth-child(2)").text();
|
|
String fundAgencyCode = parse.select(".clear tr:nth-child(25) .tabletext2:nth-child(2)").text();
|
|
String assistanceListingNumber = parse.select(".clear tr:nth-child(26) .tabletext2:nth-child(2)").text();
|
|
String initialAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(3) .tabletext2:nth-child(2)").text());
|
|
String latestAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(4) .tabletext2:nth-child(2)").text());
|
|
|
|
List<Map<String, Object>> citations = extractAllCitationInfo(html);
|
|
Map<String,Object> data = new HashMap<>();
|
|
data.put("title",title);
|
|
data.put("projectNum",projectNum);
|
|
data.put("projectLeader",projectLeader);
|
|
data.put("projectStartTime",projectStartTime);
|
|
data.put("projectEndTime",projectEndTime);
|
|
data.put("sponsorPart",sponsorPart);
|
|
data.put("country",country);
|
|
data.put("brief",brief);
|
|
data.put("sponsor",sponsor);
|
|
data.put("projectFunding",projectFunding);
|
|
data.put("relatedProject",relatedProject);
|
|
data.put("awardInstrument",awardInstrument);
|
|
data.put("programManager",programManager);
|
|
data.put("totalIntendedAwardAmount",totalIntendedAwardAmount);
|
|
data.put("totalAwardedAmountToDate",totalAwardedAmountToDate);
|
|
data.put("recipientSponsoredResearchOffice",recipientSponsoredResearchOffice);
|
|
data.put("sponsorCongressionalDistrict",sponsorCongressionalDistrict);
|
|
data.put("primaryPlaceOfPerformance",primaryPlaceOfPerformance);
|
|
data.put("primaryPlaceOfPerformanceCongressionalDistrict",primaryPlaceOfPerformanceCongressionalDistrict);
|
|
data.put("uniqueEntityIdentifier",uniqueEntityIdentifier);
|
|
data.put("parentUEI",parentUEI);
|
|
data.put("primaryProgramSource",primaryProgramSource);
|
|
data.put("programReferenceCode",programReferenceCode);
|
|
data.put("programElementCode",programElementCode);
|
|
data.put("awardAgencyCode",awardAgencyCode);
|
|
data.put("fundAgencyCode",fundAgencyCode);
|
|
data.put("assistanceListingNumber",assistanceListingNumber);
|
|
data.put("publications",citations);
|
|
data.put("initialAmendmentDate",initialAmendmentDate);
|
|
data.put("latestAmendmentDate",latestAmendmentDate);
|
|
data.put("crawlUrl",url);
|
|
data.put("crawlTime",localDateTime());
|
|
Map<String,Object> result = new HashMap<>();
|
|
result.put("keyword",keyword);
|
|
result.put("data",data);
|
|
try {
|
|
String jsonValue = objectMapper.writeValueAsString(result);
|
|
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, projectNum, jsonValue);
|
|
|
|
producer.send(record, (metadata, exception) -> {
|
|
if (exception == null) {
|
|
System.out.println("成功发送到Kafka - Partition: " + metadata.partition() +
|
|
", Offset: " + metadata.offset());
|
|
} else {
|
|
System.err.println("发送到Kafka失败: " + exception.getMessage());
|
|
}
|
|
});
|
|
} catch (Exception e) {
|
|
System.err.println("序列化或发送Kafka消息失败: " + e.getMessage());
|
|
|
|
}
|
|
|
|
Thread.sleep(sleepTime);
|
|
}
|
|
load = increaseOffsetBy30(load);
|
|
}
|
|
|
|
} catch (Exception e) {
|
|
System.err.println("处理 " + keyword + " 失败: " + e.getMessage());
|
|
e.printStackTrace();
|
|
}
|
|
});
|
|
}
|
|
executor.shutdown();
|
|
executor.awaitTermination(5, TimeUnit.HOURS);
|
|
producer.close();
|
|
}
|
|
|
|
public static String convertToTimestamp(String dateStr) {
|
|
try {
|
|
// Parse "Jan. 9, 2025" (abbreviated month, dot, comma-separated)
|
|
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH);
|
|
LocalDate date = LocalDate.parse(dateStr, inputFormatter);
|
|
|
|
// Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
|
|
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
return date.atStartOfDay().format(outputFormatter);
|
|
} catch (Exception e) {
|
|
e.printStackTrace();
|
|
return null;
|
|
}
|
|
|
|
}
|
|
public static String convertToTimestamp2(String dateStr) {
|
|
try {
|
|
// 移除 "(Estimated)" 部分
|
|
String cleanDateStr = dateStr.replace(" (Estimated)", "").trim();
|
|
|
|
// Parse "June 30, 2025" (full month, day, comma-separated year)
|
|
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH);
|
|
LocalDate date = LocalDate.parse(cleanDateStr, inputFormatter);
|
|
|
|
// Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
|
|
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
return date.atStartOfDay().format(outputFormatter);
|
|
} catch (Exception e) {
|
|
e.printStackTrace();
|
|
return null;
|
|
}
|
|
}
|
|
public static List<Map<String, Object>> extractAllCitationInfo(String html) {
|
|
Document doc = Jsoup.parse(html);
|
|
List<Map<String, Object>> citations = new ArrayList<>();
|
|
|
|
// 选择所有 margintop15
|
|
Elements marginDivs = doc.select(".margintop15");
|
|
Pattern urlPattern = Pattern.compile("javascript:popwin\\('(.*?)'\\)");
|
|
|
|
for (Element div : marginDivs) {
|
|
Map<String, Object> info = new HashMap<>();
|
|
|
|
// 提取 span 中的文本
|
|
Elements spans = div.select("> span");
|
|
if (spans.size() >= 3) {
|
|
info.put("authors", spans.get(0).text());
|
|
info.put("title", spans.get(1).text());
|
|
info.put("year", spans.get(2).text());
|
|
}
|
|
|
|
// 提取链接
|
|
Elements links = div.select("a");
|
|
String doiUrl = "";
|
|
String citationUrl = "";
|
|
for (Element link : links) {
|
|
String href = link.attr("href");
|
|
Matcher matcher = urlPattern.matcher(href);
|
|
if (matcher.find()) {
|
|
String url = matcher.group(1);
|
|
if (link.text().contains("doi.org") && doiUrl.isEmpty()) {
|
|
doiUrl = url;
|
|
} else if (link.text().contains("引用详细信息") && citationUrl.isEmpty()) {
|
|
citationUrl = url;
|
|
}
|
|
}
|
|
}
|
|
info.put("doiUrl", doiUrl);
|
|
info.put("citationUrl", citationUrl);
|
|
|
|
// 添加到结果列表
|
|
citations.add(info);
|
|
}
|
|
|
|
return citations;
|
|
}
|
|
public static String localDateTime(){
|
|
LocalDateTime dateTime = LocalDateTime.now();
|
|
|
|
// 创建 DateTimeFormatter,定义日期时间的格式
|
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
|
// 使用 formatter 格式化 LocalDateTime
|
|
String formattedDateTime = dateTime.format(formatter);
|
|
|
|
return formattedDateTime; // 输出类似: 2025-04-08 13:45:30
|
|
}
|
|
public static String increaseOffsetBy30(String originalPayload) {
|
|
// 以 "|" 分割载荷为数组
|
|
String[] parts = originalPayload.split("\\|");
|
|
|
|
// 检查数组长度,确保有足够元素
|
|
if (parts.length < 4) {
|
|
throw new IllegalArgumentException("载荷格式无效,元素不足");
|
|
}
|
|
|
|
// 找到倒数第 4 个元素的位置
|
|
int targetIndex = parts.length - 4;
|
|
|
|
try {
|
|
// 将倒数第 4 个数字解析为整数
|
|
int currentOffset = Integer.parseInt(parts[targetIndex]);
|
|
// 增加 30
|
|
int newOffset = currentOffset + 30;
|
|
// 将新值放回数组
|
|
parts[targetIndex] = String.valueOf(newOffset);
|
|
|
|
// 重新拼接载荷
|
|
return String.join("|", parts);
|
|
} catch (NumberFormatException e) {
|
|
throw new IllegalArgumentException("倒数第 4 个元素不是有效数字: " + parts[targetIndex]);
|
|
}
|
|
}
|
|
private static Response executeWithRetry(OkHttpClient client, Request request, String keyword) throws IOException {
|
|
int maxRetries = proxyList.isEmpty() ? 1 : proxyList.size(); // 如果没有代理,只尝试一次
|
|
int attempt = 0;
|
|
|
|
while (attempt < maxRetries) {
|
|
Response response = client.newCall(request).execute();
|
|
if (response.code() == 403) {
|
|
System.out.println("收到 403 状态码,尝试切换代理重试...");
|
|
response.close();
|
|
switchProxy();
|
|
client = createClientWithProxy(); // 使用新代理重建客户端
|
|
attempt++;
|
|
if (attempt == maxRetries) {
|
|
throw new IOException("所有代理尝试失败,仍然收到 403");
|
|
}
|
|
continue;
|
|
}
|
|
return response; // 成功或非 403 状态码,直接返回
|
|
}
|
|
throw new IOException("无法执行请求,未获取响应");
|
|
}
|
|
private static OkHttpClient createClientWithProxy() {
|
|
OkHttpClient.Builder builder = new OkHttpClient().newBuilder()
|
|
.connectTimeout(30, TimeUnit.SECONDS)
|
|
.readTimeout(30, TimeUnit.SECONDS)
|
|
.writeTimeout(30, TimeUnit.SECONDS);
|
|
|
|
if (!proxyList.isEmpty() && currentProxyIndex < proxyList.size()) {
|
|
String proxy = proxyList.get(currentProxyIndex);
|
|
String[] proxyParts = proxy.split(":");
|
|
if (proxyParts.length == 2) {
|
|
String proxyHost = proxyParts[0];
|
|
int proxyPort = Integer.parseInt(proxyParts[1]);
|
|
builder.proxy(new java.net.Proxy(java.net.Proxy.Type.HTTP,
|
|
new java.net.InetSocketAddress(proxyHost, proxyPort)));
|
|
System.out.println("使用代理: " + proxy);
|
|
}
|
|
}
|
|
return builder.build();
|
|
}
|
|
private static synchronized void switchProxy() {
|
|
if (proxyList.isEmpty()) return;
|
|
currentProxyIndex = (currentProxyIndex + 1) % proxyList.size();
|
|
System.out.println("切换到新代理: " + proxyList.get(currentProxyIndex));
|
|
}
|
|
}
|