package com.example; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.select.Elements; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import java.io.*; import java.util.*; import java.util.concurrent.Future; public class getInKa { // 初始化 OkHttp 客户端,用于发送 HTTP 请求 private static final OkHttpClient httpClient = new OkHttpClient(); private static final String PROCESSED_URLS_FILE = "processed_urls.txt"; // 记录已处理的 URL 文件 public static void main(String[] args) { try { // 获取目标 URL 列表 System.out.println("Starting URL collection..."); List urls = getUrls(); System.out.println("Collected " + urls.size() + " URLs."); // 从 URL 中提取新闻数据并保存到 kafka System.out.println("Starting news extraction..."); getNews(urls); System.out.println("News extraction completed."); } catch (IOException | InterruptedException e) { System.out.println("Error in main: " + e.getMessage()); } } public static List getUrls() throws IOException, InterruptedException { List urls = new ArrayList<>(); Set processedUrls = loadProcessedUrls(); // 加载已处理的 URL for (int page = 1; page <= 28; page++) { String url = "https://www.zyctd.com/zixun/201/pz102-" + page + ".html"; Request request = new Request.Builder() .url(url) .addHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0") .build(); System.out.println("Fetching page " + page + ": " + url); try (Response response = httpClient.newCall(request).execute()) { if (response.isSuccessful() && response.body() != null) { System.out.println("Successfully fetched page " + page); String html = response.body().string(); Document doc = Jsoup.parse(html); Elements links = doc.select("div.zixun-list > div.zixun-item-box > div.zixun-item-title > p > a"); List projectIDs = links.eachAttr("href"); System.out.println("Found " + projectIDs.size() + " URLs on page " + page); for (String projectUrl : projectIDs) { if (!processedUrls.contains(projectUrl)) { // 检查是否已处理 urls.add(projectUrl); processedUrls.add(projectUrl); // 添加到已处理集合 } } } else { System.out.println("Failed to fetch page " + page + ": Status code " + response.code()); } } Thread.sleep(1000); } saveProcessedUrls(processedUrls); // 保存已处理的 URL return urls; } public static void getNews(List urls) throws IOException { for (int i = 0; i < urls.size(); i++) { String url = urls.get(i); Request request = new Request.Builder() .url(url) .addHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0") .build(); System.out.println("Processing URL " + (i + 1) + "/" + urls.size() + ": " + url); try (Response response = httpClient.newCall(request).execute()) { if (response.isSuccessful() && response.body() != null) { System.out.println("Successfully fetched news from " + url); String html = response.body().string(); Document doc = Jsoup.parse(html); String title = doc.select("div.info-title.t-center > h1").text().trim(); String date = doc.select("div.author.color-grey.art-info > span:nth-child(1)").text().trim(); String content = String.join("\n", doc.select("div.info-content > div > p").eachText()).trim(); if (content.isEmpty()) { content = String.join("\n", doc.select("div.info-content > p:nth-child(2)").eachText()).trim(); } if (!title.isEmpty() && !date.isEmpty() && !content.isEmpty()) { Map news = new HashMap<>(); news.put("title", title); news.put("date", date); news.put("content", content); news.put("url", url); System.out.println("Extracted news: " + news.get("title")); saveData(news); // 调用修改后的 saveData 方法 } else { System.out.println("Failed to extract complete data from " + url); } } else { System.out.println("Failed to fetch news from " + url + ": Status code " + response.code()); } } catch (Exception e) { System.out.println("An error occurred while fetching " + url + ": " + e.getMessage()); } try { Thread.sleep(5000); // 休眠5秒 } catch (InterruptedException e) { System.out.println("Sleep interrupted: " + e.getMessage()); } } } public static void saveData(Map news) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); try (Producer producer = new KafkaProducer<>(properties)) { String topic = "news-topic"; String key = news.get("title"); String value = news.toString(); ProducerRecord record = new ProducerRecord<>(topic, key, value); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("Data sent successfully to Kafka: topic=" + metadata.topic() + ", partition=" + metadata.partition() + ", offset=" + metadata.offset()); } else { System.err.println("Failed to send data to Kafka: " + exception.getMessage()); } }).get(); } catch (Exception e) { System.err.println("Error while sending data to Kafka: " + e.getMessage()); } } // 加载已处理的 URL private static Set loadProcessedUrls() throws IOException { Set processedUrls = new HashSet<>(); File file = new File(PROCESSED_URLS_FILE); if (file.exists()) { try (BufferedReader reader = new BufferedReader(new FileReader(file))) { String line; while ((line = reader.readLine()) != null) { processedUrls.add(line.trim()); } } } return processedUrls; } // 保存已处理的 URL private static void saveProcessedUrls(Set processedUrls) throws IOException { try (BufferedWriter writer = new BufferedWriter(new FileWriter(PROCESSED_URLS_FILE))) { for (String url : processedUrls) { writer.write(url); writer.newLine(); } } } }