From 9f3717325b8e822550fb4a4c009bc6575355d0e4 Mon Sep 17 00:00:00 2001 From: 55007 <55007@maojian> Date: Tue, 7 Jan 2025 16:57:05 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9F=A5=E8=AF=86=E5=9B=BE=E8=B0=B1=E5=BA=94?= =?UTF-8?q?=E7=94=A8=EF=BC=88=E5=B7=B2=E5=BC=83=E7=94=A8=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .classpath | 40 +++ .project | 23 ++ .settings/org.eclipse.core.resources.prefs | 4 + .settings/org.eclipse.jdt.core.prefs | 9 + .settings/org.eclipse.m2e.core.prefs | 4 + pom.xml | 254 ++++++++++++++++ .../com/bfd/crawl_zstp/CrawlZstpApplication.java | 19 ++ .../bfd/crawl_zstp/controller/ApiController.java | 36 +++ .../crawl_zstp/service/DataCorrelationService.java | 12 + .../bfd/crawl_zstp/service/DataProcessService.java | 13 + .../service/impl/DataCorrelationServiceImpl.java | 332 +++++++++++++++++++++ .../service/impl/DataProcessServiceImpl.java | 114 +++++++ .../java/com/bfd/crawl_zstp/util/AsyncConfig.java | 37 +++ src/main/java/com/bfd/crawl_zstp/util/Config.java | 32 ++ .../java/com/bfd/crawl_zstp/util/Constants.java | 153 ++++++++++ .../com/bfd/crawl_zstp/util/ESClientFactory.java | 58 ++++ .../com/bfd/crawl_zstp/util/EsExecPorcess.java | 87 ++++++ .../java/com/bfd/crawl_zstp/util/EsQueryUtil.java | 123 ++++++++ src/main/java/com/bfd/crawl_zstp/util/KfkUtil.java | 81 +++++ src/main/resources/application.yml | 36 +++ src/main/resources/logback-spring.xml | 38 +++ 21 files changed, 1505 insertions(+) create mode 100644 .classpath create mode 100644 .project create mode 100644 .settings/org.eclipse.core.resources.prefs create mode 100644 .settings/org.eclipse.jdt.core.prefs create mode 100644 .settings/org.eclipse.m2e.core.prefs create mode 100644 pom.xml create mode 100644 src/main/java/com/bfd/crawl_zstp/CrawlZstpApplication.java create mode 100644 src/main/java/com/bfd/crawl_zstp/controller/ApiController.java create mode 100644 src/main/java/com/bfd/crawl_zstp/service/DataCorrelationService.java create mode 100644 src/main/java/com/bfd/crawl_zstp/service/DataProcessService.java create mode 100644 src/main/java/com/bfd/crawl_zstp/service/impl/DataCorrelationServiceImpl.java create mode 100644 src/main/java/com/bfd/crawl_zstp/service/impl/DataProcessServiceImpl.java create mode 100644 src/main/java/com/bfd/crawl_zstp/util/AsyncConfig.java create mode 100644 src/main/java/com/bfd/crawl_zstp/util/Config.java create mode 100644 src/main/java/com/bfd/crawl_zstp/util/Constants.java create mode 100644 src/main/java/com/bfd/crawl_zstp/util/ESClientFactory.java create mode 100644 src/main/java/com/bfd/crawl_zstp/util/EsExecPorcess.java create mode 100644 src/main/java/com/bfd/crawl_zstp/util/EsQueryUtil.java create mode 100644 src/main/java/com/bfd/crawl_zstp/util/KfkUtil.java create mode 100644 src/main/resources/application.yml create mode 100644 src/main/resources/logback-spring.xml diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..f7e4a1d --- /dev/null +++ b/.classpath @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000..cdaf1eb --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + crawl_zstp + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..abdea9a --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,4 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding/=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..71df522 --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,9 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.methodParameters=generate +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore +org.eclipse.jdt.core.compiler.release=disabled +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..995d5a9 --- /dev/null +++ b/pom.xml @@ -0,0 +1,254 @@ + + + 4.0.0 + jar + + org.springframework.boot + spring-boot-starter-parent + 2.2.4.RELEASE + + + com.bfd + crawl_zstp + 0.0.1-SNAPSHOT + crawl_zstp + crawl_zstp + + 1.8 + + + + org.springframework.boot + spring-boot-starter-web + + + + + + + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + com.alibaba + fastjson + 2.0.12 + + + cn.hutool + hutool-all + 5.8.10 + + + org.elasticsearch + elasticsearch + 6.0.0 + + + org.elasticsearch.client + elasticsearch-rest-client + 6.0.0 + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 6.0.0 + + + org.elasticsearch.client + elasticsearch-rest-client + + + + + org.apache.kafka + kafka-clients + 2.7.1 + + + + + + + + + + + + + + + + + + + com.squareup.okhttp3 + okhttp + 4.9.3 + + + org.apache.httpcomponents + httpclient + 4.5.14 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + de.codecentric + spring-boot-admin-client + 2.2.4 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + false + + + true + + lib/ + + com.bfd.crawl_zstp.CrawlZstpApplication + + + config/ + + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + ZIP + + + ${project.groupId} + ${project.groupId} + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-lib + package + + copy-dependencies + + + target/lib + false + false + compile + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + + resources/ + + + + + *.yml + *.xml + mqConfig/*.xml + + + + + + maven-resources-plugin + + + copy-dependencies + package + + copy-resources + + + + ${project.build.directory}/resources + + + src/main/resources + + + + + + + + + + diff --git a/src/main/java/com/bfd/crawl_zstp/CrawlZstpApplication.java b/src/main/java/com/bfd/crawl_zstp/CrawlZstpApplication.java new file mode 100644 index 0000000..6dad830 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/CrawlZstpApplication.java @@ -0,0 +1,19 @@ +package com.bfd.crawl_zstp; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; + +/** + * @author guowei + */ +@SpringBootApplication +public class CrawlZstpApplication { + public static void main(String[] args) { + + + ConfigurableApplicationContext applicationContext = SpringApplication.run(CrawlZstpApplication.class, args); + CrawlZstpApplication application = applicationContext.getBean(CrawlZstpApplication.class); +// application.run(); + } +} diff --git a/src/main/java/com/bfd/crawl_zstp/controller/ApiController.java b/src/main/java/com/bfd/crawl_zstp/controller/ApiController.java new file mode 100644 index 0000000..0070dc9 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/controller/ApiController.java @@ -0,0 +1,36 @@ +package com.bfd.crawl_zstp.controller; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +import com.bfd.crawl_zstp.service.DataProcessService; +import com.bfd.crawl_zstp.util.Config; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +/** + * @author guowei + */ +@Slf4j +@RestController +@RequestMapping(value = "/tupu") +@CrossOrigin(origins = "*", maxAge = 3600) +public class ApiController { + @Resource + private DataProcessService dataProcessService; + + @RequestMapping(value = "/correlation", method = RequestMethod.POST, produces = "application/json") + @ResponseBody + public String getchannelitems(@RequestBody JSONObject jsonObject) { + log.info("收到请求,"+jsonObject); + dataProcessService.process(jsonObject); +// String author = jsonObject.getString("author"); +// System.out.println(author); + return "success"; + } + +} diff --git a/src/main/java/com/bfd/crawl_zstp/service/DataCorrelationService.java b/src/main/java/com/bfd/crawl_zstp/service/DataCorrelationService.java new file mode 100644 index 0000000..258d815 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/service/DataCorrelationService.java @@ -0,0 +1,12 @@ +package com.bfd.crawl_zstp.service; + +import org.springframework.stereotype.Service; + +/** + * @author guowei + */ +@Service +public interface DataCorrelationService{ + + void correlationData(String businessKey); +} diff --git a/src/main/java/com/bfd/crawl_zstp/service/DataProcessService.java b/src/main/java/com/bfd/crawl_zstp/service/DataProcessService.java new file mode 100644 index 0000000..95f9af8 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/service/DataProcessService.java @@ -0,0 +1,13 @@ +package com.bfd.crawl_zstp.service; + +import com.alibaba.fastjson.JSONObject; +import org.springframework.stereotype.Service; + +/** + * @author guowei + */ +@Service +public interface DataProcessService { + + void process(JSONObject businessKey); +} diff --git a/src/main/java/com/bfd/crawl_zstp/service/impl/DataCorrelationServiceImpl.java b/src/main/java/com/bfd/crawl_zstp/service/impl/DataCorrelationServiceImpl.java new file mode 100644 index 0000000..3e279c4 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/service/impl/DataCorrelationServiceImpl.java @@ -0,0 +1,332 @@ +package com.bfd.crawl_zstp.service.impl; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.crypto.SecureUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSONPath; +import com.bfd.crawl_zstp.service.DataCorrelationService; +import com.bfd.crawl_zstp.util.Config; +import com.bfd.crawl_zstp.util.EsExecPorcess; +import com.bfd.crawl_zstp.util.EsQueryUtil; +import com.bfd.crawl_zstp.util.KfkUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author guowei + */ +@Service +@Slf4j +public class DataCorrelationServiceImpl implements DataCorrelationService { + + @Override + @Async + public void correlationData(String businessKey) { + + JSONObject atlas = (JSONObject) JSON.toJSON(Config.dataMap.get(businessKey)); + JSONArray dataList = (JSONArray) JSON.toJSON(Config.dataListMap.get(businessKey)); + Map appMap = (Map) Config.appMap.get(businessKey); + String app_code = (String) appMap.get("app_code"); + String subject_id = (String) appMap.get("subject_id"); + System.out.println(atlas); + System.out.println("dataList:" + dataList); + dataList.forEach(value -> { + try { + if (value == null || value.equals("null")) { + return; + } + JSONObject result = (JSONObject) value; + if (!result.containsKey("pageType")) { + log.error("数据组ID --> " + businessKey + ",没有pageType, value --> " + value); + return; + } + String pageType = result.getString("pageType"); + String from_key = ""; + if (businessKey.contains("fakeNews")) { + log.info("真假新闻数据组"); + if (pageType.equals("fakeNewsPage") && (atlas.containsKey("storyDetailPage_fakeNewsPage") || atlas.containsKey("fakeNewsPage_storyDetailPage")) ) { + + log.info("找到真假新闻"); + JSONObject atla_comment = new JSONObject(); + if (atlas.containsKey("storyDetailPage_fakeNewsPage")) { + atla_comment = atlas.getJSONObject("storyDetailPage_fakeNewsPage"); + }else if (atlas.containsKey("fakeNewsPage_storyDetailPage")) { + atla_comment = atlas.getJSONObject("fakeNewsPage_storyDetailPage"); + } + String to_content_comment = atla_comment.getString("to_content"); + String[] to_split_comment = to_content_comment.split(":"); + String to_value_comment = to_split_comment[1]; + String to_comment = (String) JSONPath.eval(result, to_value_comment); + String postId = result.getString("postId"); + if (to_comment.equals("假新闻")) { + //水军识别为机器人,更新用户的颜色 + log.info("真假新闻识别为假新闻,更新帖子的颜色"); + String ontology_icon = atla_comment.getString("to_ontology_icon"); + EsQueryUtil.queryOtherNewsFromEs("from_id", postId, ontology_icon); + EsQueryUtil.queryOtherNewsFromEs("to_id", postId, ontology_icon); + } + Map results = new HashMap<>(); + results.put("results", JSON.toJSONString(atla_comment)); + Map> request = (Map)Config.requestMap.get(businessKey); + request.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(request)); + } +// }); + } else { + if (pageType.equals("storyDetailPage")) { + log.info("关联主贴"); + if (atlas.containsKey("storyDetailPage_storyDetailPage")) { + log.info("帖子直接推送:帖子 --> 作者"); + JSONObject atla = atlas.getJSONObject("storyDetailPage_storyDetailPage"); + String from_comtent = atla.getString("from_content"); + String from_id = atla.getString("from_id"); + String to_content = atla.getString("to_content"); + String to_id = atla.getString("to_id"); + String[] from_split = from_comtent.split(":"); + String[] to_split = to_content.split(":"); + String[] from_id_split = from_id.split(":"); + String[] to_id_split = to_id.split(":"); + String from_value = from_split[1]; + String to_value = to_split[1]; + String from_id_value = from_id_split[1]; + String to_id_value = to_id_split[1]; + String from = (String) JSONPath.eval(result, from_value); + String to = (String) JSONPath.eval(result, to_value); + JSONObject atla_detail_new = new JSONObject(); + atla_detail_new.put("ontology_id", atla.get("ontology_id")); + atla_detail_new.put("to_ontology_id", atla.get("to_ontology_id")); + atla_detail_new.put("from_ontology_id", atla.get("from_ontology_id")); + atla_detail_new.put("ontology_label", atla.get("ontology_label")); + atla_detail_new.put("from_ontology_label", atla.get("from_ontology_label")); + atla_detail_new.put("to_ontology_type", atla.get("to_ontology_type")); + atla_detail_new.put("from_ontology_icon", atla.get("from_ontology_icon")); + atla_detail_new.put("to_ontology_icon", atla.get("to_ontology_icon")); + atla_detail_new.put("to_ontology_label", atla.get("to_ontology_label")); + atla_detail_new.put("from_ontology_type", atla.get("from_ontology_type")); + atla_detail_new.put("from_content", from); + atla_detail_new.put("from_id", JSONPath.eval(result, from_id_value)); + atla_detail_new.put("to_content", to); + atla_detail_new.put("to_id", JSONPath.eval(result, to_id_value)); + atla_detail_new.put("app_code", app_code); + atla_detail_new.put("subject_id", subject_id); + System.out.println(JSON.toJSONString(atla_detail_new)); + Map results = new HashMap<>(); + results.put("results", JSON.toJSONString(atla_detail_new)); + Map> request = (Map) Config.requestMap.get(businessKey); + request.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(request)); + EsExecPorcess.save(atla_detail_new, IdUtil.randomUUID()); + log.info("帖子 --> 作者 推送成功:" + JSON.toJSONString(atla_detail_new)); + } + if (atlas.containsKey("storyDetailPage_themePage") || atlas.containsKey("themePage_storyDetailPage")) { + log.info("有话题,关联话题"); + JSONObject atla_themePage = new JSONObject(); + if (atlas.containsKey("storyDetailPage_themePage")) { + atla_themePage = atlas.getJSONObject("storyDetailPage_themePage"); + } else if (atlas.containsKey("themePage_storyDetailPage")) { + atla_themePage = atlas.getJSONObject("themePage_storyDetailPage"); + } + String from_comtent_comment = atla_themePage.getString("from_content"); + String to_content_comment = atla_themePage.getString("to_content"); + String[] from_split_comment = from_comtent_comment.split(":"); + String from_value_comment = from_split_comment[1]; + String from_comment = (String) JSONPath.eval(result, from_value_comment); + String from_id_comment = atla_themePage.getString("from_id"); + String[] from_id_split_comment = from_id_comment.split(":"); + String from_id_value_comment = from_id_split_comment[1]; + if (from_comment.contains(to_content_comment)) { + log.info("话题匹配到"); + JSONObject atla_comment_new = new JSONObject(); + atla_comment_new.put("ontology_id", atla_themePage.get("ontology_id")); + atla_comment_new.put("to_ontology_id", atla_themePage.get("to_ontology_id")); + atla_comment_new.put("from_ontology_id", atla_themePage.get("from_ontology_id")); + atla_comment_new.put("ontology_label", atla_themePage.get("ontology_label")); + atla_comment_new.put("from_ontology_label", atla_themePage.get("from_ontology_label")); + atla_comment_new.put("to_ontology_type", atla_themePage.get("to_ontology_type")); + atla_comment_new.put("from_ontology_icon", atla_themePage.get("from_ontology_icon")); + atla_comment_new.put("to_ontology_icon", atla_themePage.get("to_ontology_icon")); + atla_comment_new.put("to_ontology_label", atla_themePage.get("to_ontology_label")); + atla_comment_new.put("from_ontology_type", atla_themePage.get("from_ontology_type")); + atla_comment_new.put("from_content", from_comment); + atla_comment_new.put("from_id", JSONPath.eval(result, from_id_value_comment)); + atla_comment_new.put("to_content", to_content_comment); + atla_comment_new.put("to_id", atla_themePage.get("to_id")); + atla_comment_new.put("app_code", app_code); + atla_comment_new.put("subject_id", subject_id); + System.out.println(JSON.toJSONString(atla_comment_new)); + Map results2 = new HashMap<>(); + results2.put("results", JSON.toJSONString(atla_comment_new)); + Map> request2 = (Map)Config.requestMap.get(businessKey); + request2.put("result", results2); + KfkUtil.sendKafka(JSON.toJSONString(request2)); + EsExecPorcess.save(atla_comment_new, IdUtil.randomUUID()); + log.info("话题关联成功 推送成功:" + JSON.toJSONString(atla_comment_new)); + } + } + String postId = result.getString("postId"); + log.info("遍历评论/转发"); + dataList.forEach(value2 -> { + if (value2 == null || value2.equals("null")) { + return; + } + JSONObject result2 = (JSONObject) value2; + String pageType2 = result2.getString("pageType"); + String postId2 = result2.getString("postId"); + if (pageType2.equals("socialComment") && postId.equals(postId2)) { + JSONObject atla_comment = new JSONObject(); + if (atlas.containsKey("socialComment_storyDetailPage")|| atlas.containsKey("storyDetailPage_socialComment")) { + log.info("找到此帖子评论"); + if (atlas.containsKey("socialComment_storyDetailPage")) { + atla_comment = atlas.getJSONObject("socialComment_storyDetailPage"); + } else if (atlas.containsKey("storyDetailPage_socialComment")) { + atla_comment = atlas.getJSONObject("storyDetailPage_socialComment"); + } + String from_comtent_comment = atla_comment.getString("from_content"); + String to_content_comment = atla_comment.getString("to_content"); + String[] from_split_comment = from_comtent_comment.split(":"); + String[] to_split_comment = to_content_comment.split(":"); + String from_value_comment = from_split_comment[1]; + String to_value_comment = to_split_comment[1]; + String from_comment = (String) JSONPath.eval(result, from_value_comment); + String to_comment = (String) JSONPath.eval(result2, to_value_comment); + String from_id_comment = atla_comment.getString("from_id"); + String to_id_comment = atla_comment.getString("to_id"); + String[] from_id_split_comment = from_id_comment.split(":"); + String[] to_id_split_comment = to_id_comment.split(":"); + JSONObject atla_comment_new = new JSONObject(); + atla_comment_new.put("ontology_id", atla_comment.get("ontology_id")); + atla_comment_new.put("to_ontology_id", atla_comment.get("to_ontology_id")); + atla_comment_new.put("from_ontology_id", atla_comment.get("from_ontology_id")); + atla_comment_new.put("ontology_label", atla_comment.get("ontology_label")); + atla_comment_new.put("from_ontology_label", atla_comment.get("from_ontology_label")); + atla_comment_new.put("to_ontology_type", atla_comment.get("to_ontology_type")); + atla_comment_new.put("from_ontology_icon", atla_comment.get("from_ontology_icon")); + atla_comment_new.put("to_ontology_icon", atla_comment.get("to_ontology_icon")); + atla_comment_new.put("to_ontology_label", atla_comment.get("to_ontology_label")); + atla_comment_new.put("from_ontology_type", atla_comment.get("from_ontology_type")); + atla_comment_new.put("from_content", from_comment); + atla_comment_new.put("from_id", JSONPath.eval(result, from_id_split_comment[1])); + atla_comment_new.put("to_content", to_comment); + atla_comment_new.put("to_id", JSONPath.eval(result2, to_id_split_comment[1])); + atla_comment_new.put("app_code", app_code); + atla_comment_new.put("subject_id", subject_id); + System.out.println(JSON.toJSONString(atla_comment_new)); + Map results2 = new HashMap<>(); + results2.put("results", JSON.toJSONString(atla_comment_new)); + Map> request2 = (Map) Config.requestMap.get(businessKey); + request2.put("result", results2); + KfkUtil.sendKafka(JSON.toJSONString(request2)); + EsExecPorcess.save(atla_comment_new, IdUtil.randomUUID()); + log.info("评论关联成功 推送成功:" + JSON.toJSONString(atla_comment_new)); + } + } else if (pageType2.equals("socialFollow") && postId.equals(postId2)) { + if (atlas.containsKey("socialFollow_storyDetailPage") || atlas.containsKey("storyDetailPage_socialFollow")) { + log.info("找到此帖子转发"); + JSONObject atla_comment = new JSONObject(); + if (atlas.containsKey("socialFollow_storyDetailPage")) { + atla_comment = atlas.getJSONObject("socialFollow_storyDetailPage"); + } else if (atlas.containsKey("storyDetailPage_socialFollow")) { + atla_comment = atlas.getJSONObject("storyDetailPage_socialFollow"); + } + String from_comtent_comment = atla_comment.getString("from_content"); + String to_content_comment = atla_comment.getString("to_content"); + String[] from_split_comment = from_comtent_comment.split(":"); + String[] to_split_comment = to_content_comment.split(":"); + String from_value_comment = from_split_comment[1]; + String to_value_comment = to_split_comment[1]; + String from_comment = (String) JSONPath.eval(result, from_value_comment); + String to_comment = (String) JSONPath.eval(result2, to_value_comment); + String from_id_comment = atla_comment.getString("from_id"); + String to_id_comment = atla_comment.getString("to_id"); + String[] from_id_split_comment = from_id_comment.split(":"); + String[] to_id_split_comment = to_id_comment.split(":"); + JSONObject atla_comment_new = new JSONObject(); + atla_comment_new.put("ontology_id", atla_comment.get("ontology_id")); + atla_comment_new.put("to_ontology_id", atla_comment.get("to_ontology_id")); + atla_comment_new.put("from_ontology_id", atla_comment.get("from_ontology_id")); + atla_comment_new.put("ontology_label", atla_comment.get("ontology_label")); + atla_comment_new.put("from_ontology_label", atla_comment.get("from_ontology_label")); + atla_comment_new.put("to_ontology_type", atla_comment.get("to_ontology_type")); + atla_comment_new.put("from_ontology_icon", atla_comment.get("from_ontology_icon")); + atla_comment_new.put("to_ontology_icon", atla_comment.get("to_ontology_icon")); + atla_comment_new.put("to_ontology_label", atla_comment.get("to_ontology_label")); + atla_comment_new.put("from_ontology_type", atla_comment.get("from_ontology_type")); + atla_comment_new.put("from_content", from_comment); + atla_comment_new.put("from_id", JSONPath.eval(result, from_id_split_comment[1])); + atla_comment_new.put("to_content", to_comment); + atla_comment_new.put("to_id", JSONPath.eval(result2, to_id_split_comment[1])); + atla_comment_new.put("app_code", app_code); + atla_comment_new.put("subject_id", subject_id); + System.out.println(JSON.toJSONString(atla_comment_new)); + Map results2 = new HashMap<>(); + results2.put("results", JSON.toJSONString(atla_comment_new)); + Map> request2 = (Map) Config.requestMap.get(businessKey); + request2.put("result", results2); + KfkUtil.sendKafka(JSON.toJSONString(request2)); + EsExecPorcess.save(atla_comment_new, IdUtil.randomUUID()); + log.info("转发关联成功,推送成功:" + JSON.toJSONString(atla_comment_new)); + } + } + }); + } + else if (pageType.equals("userInfoPage")) { + log.info("关联水军"); + dataList.forEach(value2 -> { + if (value2 == null || value2.equals("null")) { + return; + } + JSONObject result2 = (JSONObject) value2; + String pageType2 = result2.getString("pageType"); + if (pageType2.equals("userAuthenPage") && (atlas.containsKey("userInfoPage_userAuthenPage") || atlas.containsKey("userAuthenPage_userInfoPage"))) { + log.info("找到水军"); + JSONObject atla_comment = new JSONObject(); + if (atlas.containsKey("userInfoPage_userAuthenPage")) { + atla_comment = atlas.getJSONObject("userInfoPage_userAuthenPage"); + } else if (atlas.containsKey("userAuthenPage_userInfoPage")) { + atla_comment = atlas.getJSONObject("userAuthenPage_userInfoPage"); + } + String from_comtent_comment = atla_comment.getString("from_content"); + String to_content_comment = atla_comment.getString("to_content"); + String[] to_split_comment = to_content_comment.split(":"); + String to_value_comment = to_split_comment[1]; + String to_comment = (String) JSONPath.eval(result2, to_value_comment); + log.info("recognitionResult:"+to_comment); + to_comment = to_comment.replace("\\",""); + String authorId = result.getString("authorId"); + if (to_comment.equals("机器人")) { + //水军识别为机器人,更新用户的颜色 + log.info("水军识别为机器人,更新用户的颜色"); + String ontology_icon = atla_comment.getString("to_ontology_icon"); + EsQueryUtil.queryOtherNewsFromEs("from_id", authorId, ontology_icon); + EsQueryUtil.queryOtherNewsFromEs("to_id", authorId, ontology_icon); + } + Map results2 = new HashMap<>(); + results2.put("results", JSON.toJSONString(atla_comment)); + Map> request2 = (Map)Config.requestMap.get(businessKey); + request2.put("result", results2); + KfkUtil.sendKafka(JSON.toJSONString(request2)); + } + }); + } + } + }catch (Exception e){ + e.printStackTrace(); + log.error("数据组ID --> " + businessKey+":处理失败",e); + } + }); + log.info("数据组ID --> " + businessKey + ":关联完成,清除缓存"); + Config.dataMap.remove(businessKey); + Config.dataListMap.remove(businessKey); + Config.appMap.remove(businessKey); + } + + private void assembleData() { + + } +} diff --git a/src/main/java/com/bfd/crawl_zstp/service/impl/DataProcessServiceImpl.java b/src/main/java/com/bfd/crawl_zstp/service/impl/DataProcessServiceImpl.java new file mode 100644 index 0000000..f3ab71e --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/service/impl/DataProcessServiceImpl.java @@ -0,0 +1,114 @@ +package com.bfd.crawl_zstp.service.impl; + +import cn.hutool.core.util.IdUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.bfd.crawl_zstp.service.DataCorrelationService; +import com.bfd.crawl_zstp.service.DataProcessService; +import com.bfd.crawl_zstp.util.Config; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author guowei + */ +@Service +@Slf4j +public class DataProcessServiceImpl implements DataProcessService { + @Resource + DataCorrelationService dataCorrelationService; + + @Override + public void process(JSONObject jsonObject) { + JSONObject data = jsonObject.getJSONObject("data"); + String businessKey = data.getString("businessKey"); + if(data.toJSONString().contains("userAuthenPage")){ + log.info("水军请求,修改数据组ID"); + businessKey = IdUtil.randomUUID() + "_user"; + } + if(data.toJSONString().contains("fakeNewsPage")){ + log.info("真假新闻请求,修改数据组ID"); + businessKey = IdUtil.randomUUID() + "_fakeNews"; + } + + log.info("数据组ID --> " + businessKey + ";data -->" + jsonObject); + if (!Config.dataMap.containsKey(businessKey)) { + Config.dataMap.put(businessKey, new HashMap<>()); + } + //获取每组数据的Id +// if (!Config.dataMap.containsKey(businessKey)){ + JSONObject input = jsonObject.getJSONObject("input"); + JSONArray atlas = input.getJSONArray("atlas"); + String app_code = jsonObject.getString("app_code"); +// Map atla = new HashMap(); + String finalBusinessKey = businessKey; + atlas.forEach(value -> { + try { + JSONObject item = (JSONObject) JSON.toJSON(value); + System.out.println(item); + System.out.println(finalBusinessKey); +// JSONObject item = (JSONObject) value; + String from_ontology_id = item.getString("from_ontology_id"); + String to_ontology_id = item.getString("to_ontology_id"); + + String key = from_ontology_id + "_" + to_ontology_id; + + Map atla = (Map) Config.dataMap.get(finalBusinessKey); + if (!atla.containsKey(key)) { + atla.put(key, item); + Config.dataMap.put(finalBusinessKey, atla); + } + }catch (Exception e){ + log.error("处理atlas失败,value --> "+value); + } + + }); + if (!Config.dataListMap.containsKey(finalBusinessKey)) { + Config.dataListMap.put(finalBusinessKey, new ArrayList<>()); + } + if (!Config.appMap.containsKey(finalBusinessKey)) { +// Config.appMap.put(finalBusinessKey, jsonObject.getString("app_code")); + String subject_id = jsonObject.getString("source_data_id"); + Map appmap = new HashMap<>(); + appmap.put("app_code",app_code); + appmap.put("subject_id",subject_id); + Config.appMap.put(finalBusinessKey,appmap); + + } +// } + // 获取所有键 + for (String key : data.keySet()) { +// System.out.println(key); + // 检查是否key为UUID + if (key.matches("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")) { + String uuidValue = data.getString(key); + System.out.println("Key: " + key + ", Value: " + uuidValue); + List dataList = (List) Config.dataListMap.get(finalBusinessKey); + JSONObject parseObject = JSONObject.parseObject(uuidValue); + parseObject.put("app_code",app_code); + if (parseObject==null || parseObject.equals("null")){ + log.error("data为空 --> " + uuidValue); + continue; + } + dataList.add(parseObject); + Config.dataListMap.put(finalBusinessKey, dataList); + + Config.requestMap.put(finalBusinessKey,jsonObject); + } + } + + //如果是结束标识,这组数据可以进行关联 + if (data.containsKey("isLast")) { + log.info("数据组ID --> " + finalBusinessKey + ",结束调用,开始关联处理"); + dataCorrelationService.correlationData(finalBusinessKey); + } + + } +} diff --git a/src/main/java/com/bfd/crawl_zstp/util/AsyncConfig.java b/src/main/java/com/bfd/crawl_zstp/util/AsyncConfig.java new file mode 100644 index 0000000..0a72f99 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/util/AsyncConfig.java @@ -0,0 +1,37 @@ +package com.bfd.crawl_zstp.util; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + + +@Configuration +@EnableAsync //Java配置文件标注它,那么Spring就会开启异步可用 +/** + * @author guowei + * 异步任务线程池 + * 注解@EnableAsync代表开启Spring异步。这样就可以使用@Async驱动Spring使用异步, + * 但是异步需要提供可用线程池,所以这里的配置类还会实现AsyncConfigurer接口,然后覆盖getAsyncExecutor方法,这样就可以自定义一个线程池 + */ +public class AsyncConfig implements AsyncConfigurer { + + @Override + public Executor getAsyncExecutor() { + //定义线程池 + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); + //核心线程数 + threadPoolTaskExecutor.setCorePoolSize(10); + //线程池最大线程数 + threadPoolTaskExecutor.setMaxPoolSize(50); + //线程队列最大线程数 + threadPoolTaskExecutor.setQueueCapacity(200); + //初始化 + threadPoolTaskExecutor.initialize(); + + return threadPoolTaskExecutor; + } + +} diff --git a/src/main/java/com/bfd/crawl_zstp/util/Config.java b/src/main/java/com/bfd/crawl_zstp/util/Config.java new file mode 100644 index 0000000..e7f32d7 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/util/Config.java @@ -0,0 +1,32 @@ +package com.bfd.crawl_zstp.util; + +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * @author guowei + */ +@Component +public class Config { + + /** + * 每组数据对应关系 + */ + public static Map dataMap = new HashMap<>(); + /** + * 每组数据 list + */ + public static Map dataListMap = new HashMap<>(); + + public static Map appMap = new HashMap<>(); + + /** + * 数据匹配队列 + */ + public static LinkedBlockingQueue dataQueue = new LinkedBlockingQueue<>(); + + public static Map requestMap = new HashMap(); +} diff --git a/src/main/java/com/bfd/crawl_zstp/util/Constants.java b/src/main/java/com/bfd/crawl_zstp/util/Constants.java new file mode 100644 index 0000000..6ad2ee6 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/util/Constants.java @@ -0,0 +1,153 @@ +package com.bfd.crawl_zstp.util; + + +/** + * 常量实体类 + * @author jian.mao + * @date 2022年11月15日 + * @description + */ +public class Constants { + + /*************************蓝图常量key名称*********************************/ + public final static String SCHEDULING = "scheduling"; + public final static String TYPE = "type"; + public final static String INTERVAL = "interval"; + public final static String CREATED = "created"; + public final static String LAST_EDIT = "last_edit"; + public final static String BLUEPRINT_ID = "blueprint_id"; + public final static String BLUEPRINTID = "blueprintId"; + public final static String BLUEPRINT_NAME = "name"; + public final static String SCENARIO = "scenario"; + public final static String AUTOCOMMITTRIGGERLAST = "autoCommitTriggerLast"; + public final static String FRESHVARIABLES = "freshVariables"; + public final static String AUTOCOMMIT = "autoCommit"; + public final static String MAXERRORS = "maxErrors"; + public final static String DATALOSS = "dataloss"; + public final static String POSITION = "position"; + public final static String SCENES_ID = "scenes_id"; + public final static String SCENESID = "scenesId"; + public final static String MULTI_BRANCH = "multi_branch"; + + public final static String SINGLE = "single"; + /**已重试次数**/ + public final static String ERROR_TIME = "error_time"; + public final static String PREVIOUS_RESULT = "previous_result"; + + /****数据id*****/ + public final static String BUSINESSKEY = "businessKey"; + + + /*************************metadata常量key名称*********************************/ + public final static String LABEL_COL = "label_col"; + public final static String LABEL = "label"; + public final static String USER = "user"; + public final static String ADMIN = "admin"; + public final static String ADDRESS = "address"; + public final static String DATASOURCE = "datasource"; + public final static String INDEX = "index"; + + /*************************app常量key名称*********************************/ + public final static String APPS = "apps"; + public final static String TRANSFER_ID = "transfer_id"; + public final static String MODULE = "module"; + public final static String VERSION = "version"; + public final static String METADATA = "metadata"; + public final static String APP_NAME = "name"; + public final static String DESCRIBE = "describe"; + public final static String NEXT_APP_ID = "next_app_id"; + public final static String EDGE_ID = "edge_id"; + public final static String START_ID = "start_id"; + public final static String END_ID = "end_id"; + + public final static String WAIT_CONDITION = "wait_condition"; + public final static String START_TAG = "start_tag"; + + /*************************module类型*********************************/ + public final static String FILE = "file"; + public final static String OCR = "OCR"; + public final static String FILTER = "Filter"; + public final static String CHATGPT = "ChatGPT"; + public final static String MYSQL = "mysql"; + + /*************************other类型*********************************/ + public final static String UNDERLINE = "_"; + public final static String RESULT_TOPIC = null; + public static final String EMPTY = ""; + public static final String HTTP = "http"; + public static final String REQUEST_ERROR_MESSAGE = "Download failed error is"; + public static final String REQUEST_RESULT = "result"; + public static final String REQUEST_RESULT_RESULTS = "results"; + public static final String MAP_TYPE = "Map"; + public static final String LIST_TYPE = "List"; + public static final String STRING_TYPE = "String"; + public static final String DOCUMENT_TYPE = "doc"; + public static final String FILTER_ZH = "过滤器"; + + public static final String JSON_SELE_SYMBOL = "$."; + public static final String LEFT_BRACKETS = "["; + public static final String RIGTH_BRACKETS = "]"; + public static final String TASKTYPE = "taskType"; + public static final Integer USER_TYPE = 1; + public static final Integer KEYWORD_TYPE = 0; + public static final Integer DETAIL_TYPE = 2; + public static final String CID = "cid"; + public static final String SITETYPE = "siteType"; + public static final Integer DEFULT_SUBJECTID = 304864; + public static final Integer DEFULT_CRAWLCYCLICITYTIME = 1440; + public static final String CRAWLENDTIME = "crawlEndTime"; + public static final String CRAWLSTARTTIME = "crawlStartTime"; + public static final String CRAWLPAGETYPES = "crawlPageTypes"; + public static final String APPID = "113ic"; + public static final String APP_ID = "appId"; + public final static String ID = "id"; + public static final Integer DEFULT_CRAWLPERIODHOUR = 24; + public static final String CREATEUSERID = "662015832180933762"; + public static final String CRAWL_ADD_URL = "https://caiji.percent.cn/api/crawl/remote/task/save"; + public static final String CRAWLKEYWORD = "crawlKeyword"; + public static final String ATTACHTAG = "attachTag"; + public static final String ATTACHTAG_VALUE = "analyze"; + public static final String KEYWORD = "keyword"; + public static final String SITEID = "siteId"; + public static final String RESULTS = "results"; + public static final String RESULT = "result"; + public static final String CRAWLDATAFLAG = "crawlDataFlag"; + public static final String CRAWLDATAFLAG_PREFIX = "\"crawlDataFlag\":\"keyword:"; + public static final String TID = "tid"; + public static final Long TIME_OUT = 1800000L; + public static final String ATTR = "attr"; + public static final String HASVIDEO = "hasVideo"; + public static final String CRAWL_END_MARK = "crawl_end_mark"; + public static final String CRAWL_END_MESSAGE = "crawl_end_message"; + public static final String CRAWL_END_MESSAGE_VALUE = "数据采集完成"; + public static final String SUBJECTID = "subjectId"; + public static final String TASKID = "taskId"; + public static final int SUCCESS_CODE = 200; + public static final String WEB_URL_SUFFIX = "/api/aogeo/api/cda/caiji/status"; + public static final String STATUS = "status"; + /************************redis*************************************/ + public static final String LOCK_KEY = "myLock"; + public static final long LOCK_EXPIRE_TIME = 300000; + + /************************应用参数*************************************/ + public static final String CODE = "code"; + public static final String MESSAGE = "message"; + public static final String INPUT = "input"; + public static final String OUTPUT = "output"; + public static final String FORM = "form"; + public static final String FIELD = "field"; + public static final String VALUE = "value"; + public static final String DATA = "data"; + public static final String COLON_EN = ":"; + public static final String DATABASE = "database"; + public static final String TABLE = "table"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String PORT = "port"; + public static final String HOSTNAME = "hostname"; + public static final String DATATYPE = "dataType"; + public static final String RULES = "rules"; + public static final String GENID = "genId"; + public static final String KEY = "key"; + public static final String DATAID = "dataId"; +} diff --git a/src/main/java/com/bfd/crawl_zstp/util/ESClientFactory.java b/src/main/java/com/bfd/crawl_zstp/util/ESClientFactory.java new file mode 100644 index 0000000..5174f7c --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/util/ESClientFactory.java @@ -0,0 +1,58 @@ +package com.bfd.crawl_zstp.util; + + +import org.apache.http.HttpHost; +import org.apache.http.client.CredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClientBuilder; + +public class ESClientFactory { + private static final int CONNECT_TIME_OUT = 1000; + private static final int SOCKET_TIME_OUT = 30000; + private static final int CONNECTION_REQUEST_TIME_OUT = 500; + private static final int MAX_CONNECT_NUM = 100; + private static final int MAX_CONNECT_PER_ROUTE = 100; + private static boolean uniqueConnectTimeConfig = false; + private static boolean uniqueConnectNumConfig = true; + + private static final String HOST = "localhost"; + private static final int PORT = 9200; + private static final String SCHEME = "http"; + private static final String USERNAME = "elastic"; // 替换为你的 Elasticsearch 用户名 + private static final String PASSWORD = "baifendian"; // 替换为你的 Elasticsearch 密码 + + public static RestHighLevelClient init() { + + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(USERNAME, PASSWORD)); + + RestClientBuilder builder = RestClient.builder(new HttpHost("172.18.1.147", 9200, "http")) + .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider)); + + return new RestHighLevelClient(builder); + + +// RestClientBuilder builder = RestClient.builder(new HttpHost(HOST, PORT, SCHEME)) +// .setRequestConfigCallback(getRequestConfigCallback()) +// .setHttpClientConfigCallback(getHttpClientConfigCallback()); +// +// RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder); +// return restHighLevelClient; + } + + private static RestClientBuilder.RequestConfigCallback getRequestConfigCallback() { + if (uniqueConnectTimeConfig) { + // 根据需要设置连接超时配置 + return requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(5000); + } + return null; + } + + +} diff --git a/src/main/java/com/bfd/crawl_zstp/util/EsExecPorcess.java b/src/main/java/com/bfd/crawl_zstp/util/EsExecPorcess.java new file mode 100644 index 0000000..b483394 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/util/EsExecPorcess.java @@ -0,0 +1,87 @@ +package com.bfd.crawl_zstp.util; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpResponse; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + + +/** + * @PROJECT_NAME: result_assembly + * @DESCRIPTION: + * @AUTHOR: jian.mao + * @DATE: 2023/11/1 17:06 + */ +@Component +@Slf4j +public class EsExecPorcess { + + + /** + * 写入es方法 + * @param data 数据源 + * retrun 文档结构 + */ + public static Map save(Map data,String dataId){ + + // 创建凭据提供者 + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "baifendian")); + // 创建一个 HttpClient,设置凭据提供者 + CloseableHttpClient httpClient = HttpClients.custom() + .setDefaultCredentialsProvider(credentialsProvider) + .build(); + try { + StringBuffer url = new StringBuffer("http://"); + url.append("172.18.1.147"); + url.append(":"); + url.append("9200"); + url.append("/"); + url.append("youzhi_cda_kg_relation"); + url.append("/_doc/"); + url.append(dataId); + HttpPost httpPost = new HttpPost(url.toString()); + StringEntity entity = new StringEntity(JSONObject.toJSONString(data), ContentType.APPLICATION_JSON); + httpPost.setEntity(entity); + // 发送请求并获取响应 + HttpResponse response = httpClient.execute(httpPost); + // 处理响应 + int statusCode = response.getStatusLine().getStatusCode(); + String responseBody = EntityUtils.toString(response.getEntity()); + int code = 201; + int updateCode = 200; + if (statusCode == code) { + System.out.println("文档写入成功"); + log.info("文档数据写入成功:{}",responseBody); + } else if(statusCode == updateCode) { + log.info("文档已存在并更新成功:{}",responseBody); + }else{ + log.error("文档写入失败:{}",responseBody); + } + + } catch (Exception e) { + log.error("文档写入异常:",e); + }finally{ + try { + httpClient.close(); + } catch (IOException e) { + log.error("关闭httpclient连接对象异常,",e); + } + } + return data; + } +} diff --git a/src/main/java/com/bfd/crawl_zstp/util/EsQueryUtil.java b/src/main/java/com/bfd/crawl_zstp/util/EsQueryUtil.java new file mode 100644 index 0000000..41e9df2 --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/util/EsQueryUtil.java @@ -0,0 +1,123 @@ +package com.bfd.crawl_zstp.util; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.search.*; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.springframework.stereotype.Component; + +import java.util.Calendar; +import java.util.Map; + +/** + * @author guowei + */ +@Component +@Slf4j +public class EsQueryUtil { + private static RestHighLevelClient restClient = ESClientFactory.init(); + @SneakyThrows + public static void queryOtherNewsFromEs(String type, String authorId,String to_ontology_icon) { + try { + log.info("查询ES,type:"+type+",Id:"+authorId+",ontology_icon:"+to_ontology_icon); + //记录上次请求数据保留的时间 设定滚动时间间隔 + final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L)); + SearchRequest searchRequest = new SearchRequest("youzhi_cda_kg_relation"); + searchRequest.scroll(scroll); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + //设定每次返回多少条数据 + searchSourceBuilder.size(1000); + //获取当前时间、5分钟前时间 + Calendar myTime = Calendar.getInstance(); + +// searchSourceBuilder.query(QueryBuilders.termsQuery("dataId", "97e958075bc31b3cdadf5d8fc827fa93")); + searchSourceBuilder.query(QueryBuilders.matchQuery(type, authorId)); + //end + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = null; + int chonsCount = 1; + while (true) { + if (chonsCount <= 5) { + try { + searchResponse = restClient.search(searchRequest); + break; + } catch (Exception e) { + e.printStackTrace(); + System.out.println("查询有错误,休息一会儿,重试"); + Thread.sleep(5000); + } + } else { + break; + } + chonsCount++; + } + String scrollId = searchResponse.getScrollId(); + SearchHit[] searchHits = searchResponse.getHits().getHits(); + System.out.println("查询结果:" + searchHits.length); + int index = 0; + //处理数据 + getIndex(searchHits, type,to_ontology_icon); + if (searchHits.length == 1000) { +// int num=0; + //遍历搜索命中的数据,直到没有数据 + while (searchHits != null && searchHits.length > 0 ) { + SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); + scrollRequest.scroll(scroll); + searchResponse = restClient.searchScroll(scrollRequest); + scrollId = searchResponse.getScrollId(); + searchHits = searchResponse.getHits().getHits(); + if (searchHits != null && searchHits.length > 0) { + log.info("--- 下一页-----专题数据长度" + searchHits.length); + //处理数据 + getIndex(searchHits, type,to_ontology_icon); +// num++; + } + } + } + Thread.sleep(5000); + //清除滚屏 + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId);//也可以选择setScrollIds()将多个scrollId一起使用 + ClearScrollResponse clearScrollResponse = null; + clearScrollResponse = restClient.clearScroll(clearScrollRequest); + boolean succeeded = clearScrollResponse.isSucceeded(); + log.info("ID:"+authorId+"->----查询任务结束,length:" + searchHits.length); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private static void getIndex(SearchHit[] searchHits, String type,String ontology_icon) { + for (SearchHit searchHit : searchHits) { + System.out.println(searchHit); + String id = searchHit.getId(); + System.out.println(id); + Map obj = searchHit.getSourceAsMap(); + System.out.println(obj); + switch (type){ + case "from_id": + obj.put("from_ontology_icon",ontology_icon); + break; + case "to_id": + obj.put("to_ontology_icon",ontology_icon); + break; + default: + break; + } + System.out.println("new:"+obj); + EsExecPorcess.save(obj,id); + + } + } + + public static void main(String[] args) { + queryOtherNewsFromEs("to_id","782640277","#FF6655"); + } + + +} diff --git a/src/main/java/com/bfd/crawl_zstp/util/KfkUtil.java b/src/main/java/com/bfd/crawl_zstp/util/KfkUtil.java new file mode 100644 index 0000000..06280db --- /dev/null +++ b/src/main/java/com/bfd/crawl_zstp/util/KfkUtil.java @@ -0,0 +1,81 @@ +package com.bfd.crawl_zstp.util; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.Properties; + +/** + * @author guowei + * kfk工具类 + */ +@Component +@Slf4j +public class KfkUtil { + private static String topic; + + private static String brokerList; + + @Value("${crawl.kafka.topic}") + public void setTopic(String topic) { + KfkUtil.topic = topic; + } + + @Value("${crawl.kafka.brokers}") + public void setBrokerList(String brokerList) { + KfkUtil.brokerList = brokerList; + } + private static KafkaProducer kafkaProducer; + + public static int num = 0; + + /** + * 获取KafkaProducer实例 + */ + public static KafkaProducer getProducer() { + if (kafkaProducer == null) { + Properties props = new Properties(); + //xxx服务器ip + props.put("bootstrap.servers", brokerList); + //所有follower都响应了才认为消息提交成功,即"committed" + props.put("acks", "all"); + //retries = MAX 无限重试,直到你意识到出现了问题:) + props.put("retries", 3); + //producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 + props.put("batch.size", 16384); + //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms + //延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 + props.put("linger.ms", 1); + //producer可以用来缓存数据的内存大小。 + props.put("buffer.memory", 33554432); + props.put("key.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + kafkaProducer = new KafkaProducer(props); + } + return kafkaProducer; + } + + /** + * 关闭KafkaProducer实例 + */ + public static void closeProducer() { + if (kafkaProducer != null) { + log.info("----------close producer----------"); + kafkaProducer.close(); + kafkaProducer = null; + } + } + + public static void sendKafka(String resultData) { + KafkaProducer producer = getProducer(); + ProducerRecord se = new ProducerRecord(topic, resultData); + producer.send(se); + log.info("发送kafka成功"); +// num++; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..6cac8cd --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,36 @@ +crawl: + kafka: + topic: analyze + brokers: 172.16.12.55:9092,172.16.12.56:9092,172.16.12.57:9092 + task: + taskData: ./data/taskData.txt +server: + port: 9999 +#日志级别 +logging: + level: + com: + bfd: INFO + #日志路径 + log: + path: ./logs +spring: + boot: + admin: + client: + url: http://172.16.12.55:8001 + # instance: + # service-base-url: http://10.10.114.17:9999 + application: + name: zstp +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: always + health: + elasticsearch: + enabled: false \ No newline at end of file diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..0c59240 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,38 @@ + + + + + + + + + true + + ${logging.level} + + + ${logging.path}/crawlSchedule.log + + + + ${logging.path}/crawlSchedule.log.%d{yyyy-MM-dd} + + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + UTF-8 + + + + + + + +