You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

403 lines
23 KiB

package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class projTopic {
private static final String TOPIC_NAME = "projTopic";
private static final String BOOTSTRAP_SERVERS = "node-01:19092";
private static KafkaProducer<String, String> producer;
private static ObjectMapper objectMapper = new ObjectMapper();
private static final Random random = new Random();
private static List<String> proxyList = new ArrayList<>(); // 代理池
private static int currentProxyIndex = 0; // 当前使用的代理索引
static {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
producer = new KafkaProducer<>(props);
try {
proxyList = Files.readAllLines(Paths.get("proxy.txt"));
if (proxyList.isEmpty()) {
System.out.println("警告: proxy.txt 为空,未加载任何代理");
} else {
System.out.println("成功加载 " + proxyList.size() + " 个代理");
}
} catch (IOException e) {
System.err.println("读取 proxy.txt 失败: " + e.getMessage());
}
}
public static void main(String[] args) throws IOException, InterruptedException {
List<String> keywords = Files.readAllLines(Paths.get("keywords.txt"));
List<String> cleanedKeywords = new ArrayList<>();
for (String keyword : keywords) {
String cleaned = keyword.split(",")[0].trim(); // 取逗号前的部分并去除首尾空格
cleaned = cleaned.replaceAll("\\s+", "+"); // 替换所有空格为 +
cleanedKeywords.add(cleaned);
}
ExecutorService executor = Executors.newFixedThreadPool(4); // 4 个线程
for (String keyword : cleanedKeywords) {
executor.submit(() -> {
try {
int sleepTime = random.nextInt(1001) + 30000;
String load = "5|0|20|https://www.nsf.gov/awardsearch/jsp/gwt/search/|57BE5CA45E781DC0159F727F8A8205EB|gov.nsf.research.awardsearch.gwt.client.SearchAwardService|getAwards|gov.nsf.research.awardsearch.gwt.bean.SearchRequestBean/3930579236|com.extjs.gxt.ui.client.data.PagingLoadConfig|java.util.HashMap/962170901|java.lang.String/2004016611|QueryText|" + keyword + "|ActiveAwards|true|com.extjs.gxt.ui.client.data.BasePagingLoadConfig/2011366567|com.extjs.gxt.ui.client.data.RpcMap/3441186752|sortField|sortDir|com.extjs.gxt.ui.client.Style$SortDir/640452531|offset|java.lang.Integer/3438268394|limit|1|2|3|4|2|5|6|5|7|2|8|9|8|10|8|11|8|12|13|0|1|14|4|15|0|16|17|0|18|19|0|20|19|30|";
for(int i=0;;i++){
OkHttpClient client = createClientWithProxy();
MediaType mediaType = MediaType.parse("text/x-gwt-rpc; charset=UTF-8");
RequestBody body = RequestBody.create(mediaType, load);
Request request = new Request.Builder()
.url("https://www.nsf.gov/awardsearch/jsp/gwt/search/.searchaward")
.method("POST", body)
.addHeader("Content-Type", "text/x-gwt-rpc; charset=UTF-8")
.addHeader("X-GWT-Module-Base", "https://www.nsf.gov/awardsearch/jsp/gwt/search/")
.addHeader("X-GWT-Permutation", "368C3CF86AA4CD7DB2250B35B844C1C2")
// .addHeader("cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
.build();
Response response = executeWithRetry(client, request, keyword);
String content = response.body().string();
Pattern pattern = Pattern.compile("\"awdNumber\",\"(\\d+)\"");
Matcher matcher = pattern.matcher(content);
List<String> numbers = new ArrayList<>(); // 用于存储匹配的数字
// 查找并提取数字
List<String> additionalNumbers = new ArrayList<>();
List<String> urls = new ArrayList<>();
// 查找匹配项
while (matcher.find()) {
// 获取捕获到的数字,并将其添加到列表中
numbers.add(matcher.group(1));
}
// 输出捕获到的数字
if (numbers.isEmpty()) {
System.out.println("没找到awdNumber,继续下一种查找");
} else {
for (String number : numbers) {
additionalNumbers.add(number);
}
}
Pattern additionalPattern = Pattern.compile("\"[^\"]+\",\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\"(?:,\"(?:\\d{2}/\\d{2}/\\d{4}|\\d+\\.\\d+)\")?,\"(\\d+)\"");
Matcher additionalMatcher = additionalPattern.matcher(content);
while (additionalMatcher.find()) {
additionalNumbers.add(additionalMatcher.group(1));
}
if (additionalNumbers.isEmpty()) {
System.out.println("没找到下一页内容链接");
Thread.sleep(sleepTime);
break;
} else {
for (String number : additionalNumbers) {
String url = "https://www.nsf.gov/awardsearch/showAward?AWD_ID=" + number + "&HistoricalAwards=false";
urls.add(url);
}
}
if (!urls.isEmpty() && urls.get(0).equals("https://www.nsf.gov/awardsearch/showAward?AWD_ID=2446604&HistoricalAwards=false")) {
System.out.println("第一个 URL 是 AWD_ID=2446604,跳过关键词: " + keyword);
Thread.sleep(sleepTime);
return; // 跳出当前任务,处理下一个关键词
}
for(String url:urls){
OkHttpClient client2 = createClientWithProxy();
MediaType mediaType2 = MediaType.parse("text/plain");
RequestBody body2 = RequestBody.create(mediaType2, "");
Request request2 = new Request.Builder()
.url(url)
.get()
// .addHeader("Cookie", "JSESSIONID=E9DCB88F6AD2241C9973AFEC03158ECB")
.build();
Response response2 = executeWithRetry(client2, request2, keyword);
System.out.println(response2.code());
String html = response2.body().string();
Document parse = Jsoup.parse(html);
String title = parse.select(".pageheadline").text();
String projectNum = parse.select(".clear tr:nth-child(5) .tabletext2:nth-child(2)").text();
String projectLeader = parse.select(".clear tr:nth-child(13) .tabletext2:nth-child(2)").text();
String projectStartTime = convertToTimestamp(parse.select(".clear tr:nth-child(8) .tabletext2:nth-child(2)").text());
String projectEndTime = convertToTimestamp2(parse.select(".clear tr:nth-child(9) .tabletext2:nth-child(2)").text());
String sponsorPart = parse.select(".clear tr:nth-child(2) .tabletext2:nth-child(2)").text();
String country = "USA";
String brief = parse.select(".clear.margintop25 span").text();
String sponsor = parse.select(".clear tr:nth-child(1) .tabletext2:nth-child(2)").text();
String projectFunding = parse.select(".clear tr:nth-child(12) .tabletext2:nth-child(2)").text();
String relatedProject = parse.select(".clear tr:nth-child(20) .tabletext2:nth-child(2)").text();
String awardInstrument = parse.select(".clear tr:nth-child(6) .tabletext2:nth-child(2)").text();
String programManager = parse.select(".clear tr:nth-child(7) .tabletext2:nth-child(2)").text();
String totalIntendedAwardAmount = parse.select(".clear tr:nth-child(10) .tabletext2:nth-child(2)").text();
String totalAwardedAmountToDate = parse.select(".clear tr:nth-child(11) .tabletext2:nth-child(2)").text();
String recipientSponsoredResearchOffice = parse.select(".clear tr:nth-child(14) .tabletext2:nth-child(2)").text();
String sponsorCongressionalDistrict = parse.select(".clear tr:nth-child(15) .tabletext2:nth-child(2)").text();
String primaryPlaceOfPerformance = parse.select(".clear tr:nth-child(16) .tabletext2:nth-child(2)").text();
String primaryPlaceOfPerformanceCongressionalDistrict = parse.select(".clear tr:nth-child(17) .tabletext2:nth-child(2)").text();
String uniqueEntityIdentifier = parse.select(".clear tr:nth-child(18) .tabletext2:nth-child(2)").text();
String parentUEI = parse.select(".clear tr:nth-child(19) .tabletext2:nth-child(2)").text();
String primaryProgramSource = parse.select(".clear tr:nth-child(21) .tabletext2:nth-child(2)").text();
String programReferenceCode = parse.select(".clear tr:nth-child(22) .tabletext2:nth-child(2)").text();
String programElementCode = parse.select(".clear tr:nth-child(23) .tabletext2:nth-child(2)").text();
String awardAgencyCode = parse.select(".clear tr:nth-child(24) .tabletext2:nth-child(2)").text();
String fundAgencyCode = parse.select(".clear tr:nth-child(25) .tabletext2:nth-child(2)").text();
String assistanceListingNumber = parse.select(".clear tr:nth-child(26) .tabletext2:nth-child(2)").text();
String initialAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(3) .tabletext2:nth-child(2)").text());
String latestAmendmentDate = convertToTimestamp(parse.select(".clear tr:nth-child(4) .tabletext2:nth-child(2)").text());
List<Map<String, Object>> citations = extractAllCitationInfo(html);
Map<String,Object> data = new HashMap<>();
data.put("title",title);
data.put("projectNum",projectNum);
data.put("projectLeader",projectLeader);
data.put("projectStartTime",projectStartTime);
data.put("projectEndTime",projectEndTime);
data.put("sponsorPart",sponsorPart);
data.put("country",country);
data.put("brief",brief);
data.put("sponsor",sponsor);
data.put("projectFunding",projectFunding);
data.put("relatedProject",relatedProject);
data.put("awardInstrument",awardInstrument);
data.put("programManager",programManager);
data.put("totalIntendedAwardAmount",totalIntendedAwardAmount);
data.put("totalAwardedAmountToDate",totalAwardedAmountToDate);
data.put("recipientSponsoredResearchOffice",recipientSponsoredResearchOffice);
data.put("sponsorCongressionalDistrict",sponsorCongressionalDistrict);
data.put("primaryPlaceOfPerformance",primaryPlaceOfPerformance);
data.put("primaryPlaceOfPerformanceCongressionalDistrict",primaryPlaceOfPerformanceCongressionalDistrict);
data.put("uniqueEntityIdentifier",uniqueEntityIdentifier);
data.put("parentUEI",parentUEI);
data.put("primaryProgramSource",primaryProgramSource);
data.put("programReferenceCode",programReferenceCode);
data.put("programElementCode",programElementCode);
data.put("awardAgencyCode",awardAgencyCode);
data.put("fundAgencyCode",fundAgencyCode);
data.put("assistanceListingNumber",assistanceListingNumber);
data.put("publications",citations);
data.put("initialAmendmentDate",initialAmendmentDate);
data.put("latestAmendmentDate",latestAmendmentDate);
data.put("crawlUrl",url);
data.put("crawlTime",localDateTime());
Map<String,Object> result = new HashMap<>();
result.put("keyword",keyword);
result.put("data",data);
try {
String jsonValue = objectMapper.writeValueAsString(result);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, projectNum, jsonValue);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("成功发送到Kafka - Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
} else {
System.err.println("发送到Kafka失败: " + exception.getMessage());
}
});
} catch (Exception e) {
System.err.println("序列化或发送Kafka消息失败: " + e.getMessage());
}
Thread.sleep(sleepTime);
}
load = increaseOffsetBy30(load);
}
} catch (Exception e) {
System.err.println("处理 " + keyword + " 失败: " + e.getMessage());
e.printStackTrace();
}
});
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.HOURS);
producer.close();
}
public static String convertToTimestamp(String dateStr) {
try {
// Parse "Jan. 9, 2025" (abbreviated month, dot, comma-separated)
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH);
LocalDate date = LocalDate.parse(dateStr, inputFormatter);
// Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return date.atStartOfDay().format(outputFormatter);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static String convertToTimestamp2(String dateStr) {
try {
// 移除 "(Estimated)" 部分
String cleanDateStr = dateStr.replace(" (Estimated)", "").trim();
// Parse "June 30, 2025" (full month, day, comma-separated year)
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("MMMM d, yyyy", Locale.ENGLISH);
LocalDate date = LocalDate.parse(cleanDateStr, inputFormatter);
// Format to "yyyy-MM-dd HH:mm:ss" (defaulting time to 00:00:00)
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return date.atStartOfDay().format(outputFormatter);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static List<Map<String, Object>> extractAllCitationInfo(String html) {
Document doc = Jsoup.parse(html);
List<Map<String, Object>> citations = new ArrayList<>();
// 选择所有 margintop15
Elements marginDivs = doc.select(".margintop15");
Pattern urlPattern = Pattern.compile("javascript:popwin\\('(.*?)'\\)");
for (Element div : marginDivs) {
Map<String, Object> info = new HashMap<>();
// 提取 span 中的文本
Elements spans = div.select("> span");
if (spans.size() >= 3) {
info.put("authors", spans.get(0).text());
info.put("title", spans.get(1).text());
info.put("year", spans.get(2).text());
}
// 提取链接
Elements links = div.select("a");
String doiUrl = "";
String citationUrl = "";
for (Element link : links) {
String href = link.attr("href");
Matcher matcher = urlPattern.matcher(href);
if (matcher.find()) {
String url = matcher.group(1);
if (link.text().contains("doi.org") && doiUrl.isEmpty()) {
doiUrl = url;
} else if (link.text().contains("引用详细信息") && citationUrl.isEmpty()) {
citationUrl = url;
}
}
}
info.put("doiUrl", doiUrl);
info.put("citationUrl", citationUrl);
// 添加到结果列表
citations.add(info);
}
return citations;
}
public static String localDateTime(){
LocalDateTime dateTime = LocalDateTime.now();
// 创建 DateTimeFormatter,定义日期时间的格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 使用 formatter 格式化 LocalDateTime
String formattedDateTime = dateTime.format(formatter);
return formattedDateTime; // 输出类似: 2025-04-08 13:45:30
}
public static String increaseOffsetBy30(String originalPayload) {
// 以 "|" 分割载荷为数组
String[] parts = originalPayload.split("\\|");
// 检查数组长度,确保有足够元素
if (parts.length < 4) {
throw new IllegalArgumentException("载荷格式无效,元素不足");
}
// 找到倒数第 4 个元素的位置
int targetIndex = parts.length - 4;
try {
// 将倒数第 4 个数字解析为整数
int currentOffset = Integer.parseInt(parts[targetIndex]);
// 增加 30
int newOffset = currentOffset + 30;
// 将新值放回数组
parts[targetIndex] = String.valueOf(newOffset);
// 重新拼接载荷
return String.join("|", parts);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("倒数第 4 个元素不是有效数字: " + parts[targetIndex]);
}
}
private static Response executeWithRetry(OkHttpClient client, Request request, String keyword) throws IOException {
int maxRetries = proxyList.isEmpty() ? 1 : proxyList.size(); // 如果没有代理,只尝试一次
int attempt = 0;
while (attempt < maxRetries) {
Response response = client.newCall(request).execute();
if (response.code() == 403) {
System.out.println("收到 403 状态码,尝试切换代理重试...");
response.close();
switchProxy();
client = createClientWithProxy(); // 使用新代理重建客户端
attempt++;
if (attempt == maxRetries) {
throw new IOException("所有代理尝试失败,仍然收到 403");
}
continue;
}
return response; // 成功或非 403 状态码,直接返回
}
throw new IOException("无法执行请求,未获取响应");
}
private static OkHttpClient createClientWithProxy() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS);
if (!proxyList.isEmpty() && currentProxyIndex < proxyList.size()) {
String proxy = proxyList.get(currentProxyIndex);
String[] proxyParts = proxy.split(":");
if (proxyParts.length == 2) {
String proxyHost = proxyParts[0];
int proxyPort = Integer.parseInt(proxyParts[1]);
builder.proxy(new java.net.Proxy(java.net.Proxy.Type.HTTP,
new java.net.InetSocketAddress(proxyHost, proxyPort)));
System.out.println("使用代理: " + proxy);
}
}
return builder.build();
}
private static synchronized void switchProxy() {
if (proxyList.isEmpty()) return;
currentProxyIndex = (currentProxyIndex + 1) % proxyList.size();
System.out.println("切换到新代理: " + proxyList.get(currentProxyIndex));
}
}