Browse Source

知识图谱应用(已弃用)

master
55007 6 months ago
commit
9f3717325b
  1. 40
      .classpath
  2. 23
      .project
  3. 4
      .settings/org.eclipse.core.resources.prefs
  4. 9
      .settings/org.eclipse.jdt.core.prefs
  5. 4
      .settings/org.eclipse.m2e.core.prefs
  6. 254
      pom.xml
  7. 19
      src/main/java/com/bfd/crawl_zstp/CrawlZstpApplication.java
  8. 36
      src/main/java/com/bfd/crawl_zstp/controller/ApiController.java
  9. 12
      src/main/java/com/bfd/crawl_zstp/service/DataCorrelationService.java
  10. 13
      src/main/java/com/bfd/crawl_zstp/service/DataProcessService.java
  11. 332
      src/main/java/com/bfd/crawl_zstp/service/impl/DataCorrelationServiceImpl.java
  12. 114
      src/main/java/com/bfd/crawl_zstp/service/impl/DataProcessServiceImpl.java
  13. 37
      src/main/java/com/bfd/crawl_zstp/util/AsyncConfig.java
  14. 32
      src/main/java/com/bfd/crawl_zstp/util/Config.java
  15. 153
      src/main/java/com/bfd/crawl_zstp/util/Constants.java
  16. 58
      src/main/java/com/bfd/crawl_zstp/util/ESClientFactory.java
  17. 87
      src/main/java/com/bfd/crawl_zstp/util/EsExecPorcess.java
  18. 123
      src/main/java/com/bfd/crawl_zstp/util/EsQueryUtil.java
  19. 81
      src/main/java/com/bfd/crawl_zstp/util/KfkUtil.java
  20. 36
      src/main/resources/application.yml
  21. 38
      src/main/resources/logback-spring.xml

40
.classpath

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

23
.project

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>crawl_zstp</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

4
.settings/org.eclipse.core.resources.prefs

@ -0,0 +1,4 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding/<project>=UTF-8

9
.settings/org.eclipse.jdt.core.prefs

@ -0,0 +1,9 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.methodParameters=generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8

4
.settings/org.eclipse.m2e.core.prefs

@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

254
pom.xml

@ -0,0 +1,254 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bfd</groupId>
<artifactId>crawl_zstp</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>crawl_zstp</name>
<description>crawl_zstp</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.mysql</groupId>-->
<!-- <artifactId>mysql-connector-j</artifactId>-->
<!-- <scope>runtime</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.12</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.0.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>javax.servlet</groupId>-->
<!-- <artifactId>javax.servlet-api</artifactId>-->
<!-- <version>4.0.1</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/javax.mail/mail -->
<!-- <dependency>-->
<!-- <groupId>javax.mail</groupId>-->
<!-- <artifactId>mail</artifactId>-->
<!-- <version>1.4.7</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.appium</groupId>-->
<!-- <artifactId>java-client</artifactId>-->
<!-- <version>7.5.1</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.squareup.okio</groupId>-->
<!-- <artifactId>okio</artifactId>-->
<!-- <version>2.8.0</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>druid</artifactId>-->
<!-- <version>1.2.14</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.mybatis.spring.boot</groupId>-->
<!-- <artifactId>mybatis-spring-boot-starter</artifactId>-->
<!-- <version>2.2.2</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.httpcomponents</groupId>-->
<!-- <artifactId>httpcore</artifactId>-->
<!-- <version>4.4.16</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.httpcomponents</groupId>-->
<!-- <artifactId>httpclient</artifactId>-->
<!-- <version>4.5.14</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/de.codecentric/spring-boot-admin-starter-client -->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-client</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<!-- 指定启动类,将依赖打成外部jar包 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<!-- 生成的jar中,不要包含pom.xml和pom.properties这两个文件 -->
<addMavenDescriptor>false</addMavenDescriptor>
<manifest>
<!-- 是否要把第三方jar加入到类构建路径 -->
<addClasspath>true</addClasspath>
<!-- 外部依赖jar包的最终位置 -->
<classpathPrefix>lib/</classpathPrefix>
<!-- 项目启动类 -->
<mainClass>com.bfd.crawl_zstp.CrawlZstpApplication</mainClass>
</manifest>
<manifestEntries>
<Class-Path>config/</Class-Path>
</manifestEntries>
</archive>
<!-- &lt;!&ndash;过滤掉不希望包含在jar中的文件 &ndash;&gt;-->
<!-- <excludes>-->
<!-- <exclude>${project.basedir}/xml/*</exclude>-->
<!-- </excludes>-->
</configuration>
</plugin>
<plugin>
<!--打包时去除第三方依赖-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<layout>ZIP</layout>
<includes>
<include>
<groupId>${project.groupId}</groupId>
<artifactId>${project.groupId}</artifactId>
</include>
</includes>
</configuration>
</plugin>
<!--拷贝依赖到jar外面的lib目录-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-lib</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/lib</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<stripVersion>false</stripVersion>
<includeScope>compile</includeScope>
</configuration>
</execution>
</executions>
</plugin>
<!--指定配置文件,将resources打成外部resource-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<!-- 指定配置文件目录,这样jar运行时会去找到同目录下的resources文件夹下查找 -->
<manifestEntries>
<Class-Path>resources/</Class-Path>
</manifestEntries>
</archive>
<!-- 打包时忽略的文件(也就是不打进jar包里的文件) -->
<excludes>
<exclude>*.yml</exclude>
<exclude>*.xml</exclude>
<exclude>mqConfig/*.xml</exclude>
</excludes>
</configuration>
</plugin>
<!-- 拷贝资源文件 外面的resource目录-->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<!-- 资源文件输出目录 -->
<outputDirectory>${project.build.directory}/resources</outputDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

19
src/main/java/com/bfd/crawl_zstp/CrawlZstpApplication.java

@ -0,0 +1,19 @@
package com.bfd.crawl_zstp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
/**
* @author guowei
*/
@SpringBootApplication
public class CrawlZstpApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(CrawlZstpApplication.class, args);
CrawlZstpApplication application = applicationContext.getBean(CrawlZstpApplication.class);
// application.run();
}
}

36
src/main/java/com/bfd/crawl_zstp/controller/ApiController.java

@ -0,0 +1,36 @@
package com.bfd.crawl_zstp.controller;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bfd.crawl_zstp.service.DataProcessService;
import com.bfd.crawl_zstp.util.Config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* @author guowei
*/
@Slf4j
@RestController
@RequestMapping(value = "/tupu")
@CrossOrigin(origins = "*", maxAge = 3600)
public class ApiController {
@Resource
private DataProcessService dataProcessService;
@RequestMapping(value = "/correlation", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
public String getchannelitems(@RequestBody JSONObject jsonObject) {
log.info("收到请求,"+jsonObject);
dataProcessService.process(jsonObject);
// String author = jsonObject.getString("author");
// System.out.println(author);
return "success";
}
}

12
src/main/java/com/bfd/crawl_zstp/service/DataCorrelationService.java

@ -0,0 +1,12 @@
package com.bfd.crawl_zstp.service;
import org.springframework.stereotype.Service;
/**
* @author guowei
*/
@Service
public interface DataCorrelationService{
void correlationData(String businessKey);
}

13
src/main/java/com/bfd/crawl_zstp/service/DataProcessService.java

@ -0,0 +1,13 @@
package com.bfd.crawl_zstp.service;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Service;
/**
* @author guowei
*/
@Service
public interface DataProcessService {
void process(JSONObject businessKey);
}

332
src/main/java/com/bfd/crawl_zstp/service/impl/DataCorrelationServiceImpl.java

@ -0,0 +1,332 @@
package com.bfd.crawl_zstp.service.impl;
import cn.hutool.core.util.IdUtil;
import cn.hutool.crypto.SecureUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.bfd.crawl_zstp.service.DataCorrelationService;
import com.bfd.crawl_zstp.util.Config;
import com.bfd.crawl_zstp.util.EsExecPorcess;
import com.bfd.crawl_zstp.util.EsQueryUtil;
import com.bfd.crawl_zstp.util.KfkUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* @author guowei
*/
@Service
@Slf4j
public class DataCorrelationServiceImpl implements DataCorrelationService {
@Override
@Async
public void correlationData(String businessKey) {
JSONObject atlas = (JSONObject) JSON.toJSON(Config.dataMap.get(businessKey));
JSONArray dataList = (JSONArray) JSON.toJSON(Config.dataListMap.get(businessKey));
Map appMap = (Map) Config.appMap.get(businessKey);
String app_code = (String) appMap.get("app_code");
String subject_id = (String) appMap.get("subject_id");
System.out.println(atlas);
System.out.println("dataList:" + dataList);
dataList.forEach(value -> {
try {
if (value == null || value.equals("null")) {
return;
}
JSONObject result = (JSONObject) value;
if (!result.containsKey("pageType")) {
log.error("数据组ID --> " + businessKey + ",没有pageType, value --> " + value);
return;
}
String pageType = result.getString("pageType");
String from_key = "";
if (businessKey.contains("fakeNews")) {
log.info("真假新闻数据组");
if (pageType.equals("fakeNewsPage") && (atlas.containsKey("storyDetailPage_fakeNewsPage") || atlas.containsKey("fakeNewsPage_storyDetailPage")) ) {
log.info("找到真假新闻");
JSONObject atla_comment = new JSONObject();
if (atlas.containsKey("storyDetailPage_fakeNewsPage")) {
atla_comment = atlas.getJSONObject("storyDetailPage_fakeNewsPage");
}else if (atlas.containsKey("fakeNewsPage_storyDetailPage")) {
atla_comment = atlas.getJSONObject("fakeNewsPage_storyDetailPage");
}
String to_content_comment = atla_comment.getString("to_content");
String[] to_split_comment = to_content_comment.split(":");
String to_value_comment = to_split_comment[1];
String to_comment = (String) JSONPath.eval(result, to_value_comment);
String postId = result.getString("postId");
if (to_comment.equals("假新闻")) {
//水军识别为机器人,更新用户的颜色
log.info("真假新闻识别为假新闻,更新帖子的颜色");
String ontology_icon = atla_comment.getString("to_ontology_icon");
EsQueryUtil.queryOtherNewsFromEs("from_id", postId, ontology_icon);
EsQueryUtil.queryOtherNewsFromEs("to_id", postId, ontology_icon);
}
Map<Object, Object> results = new HashMap<>();
results.put("results", JSON.toJSONString(atla_comment));
Map<String, Map<Object, Object>> request = (Map)Config.requestMap.get(businessKey);
request.put("result", results);
KfkUtil.sendKafka(JSON.toJSONString(request));
}
// });
} else {
if (pageType.equals("storyDetailPage")) {
log.info("关联主贴");
if (atlas.containsKey("storyDetailPage_storyDetailPage")) {
log.info("帖子直接推送:帖子 --> 作者");
JSONObject atla = atlas.getJSONObject("storyDetailPage_storyDetailPage");
String from_comtent = atla.getString("from_content");
String from_id = atla.getString("from_id");
String to_content = atla.getString("to_content");
String to_id = atla.getString("to_id");
String[] from_split = from_comtent.split(":");
String[] to_split = to_content.split(":");
String[] from_id_split = from_id.split(":");
String[] to_id_split = to_id.split(":");
String from_value = from_split[1];
String to_value = to_split[1];
String from_id_value = from_id_split[1];
String to_id_value = to_id_split[1];
String from = (String) JSONPath.eval(result, from_value);
String to = (String) JSONPath.eval(result, to_value);
JSONObject atla_detail_new = new JSONObject();
atla_detail_new.put("ontology_id", atla.get("ontology_id"));
atla_detail_new.put("to_ontology_id", atla.get("to_ontology_id"));
atla_detail_new.put("from_ontology_id", atla.get("from_ontology_id"));
atla_detail_new.put("ontology_label", atla.get("ontology_label"));
atla_detail_new.put("from_ontology_label", atla.get("from_ontology_label"));
atla_detail_new.put("to_ontology_type", atla.get("to_ontology_type"));
atla_detail_new.put("from_ontology_icon", atla.get("from_ontology_icon"));
atla_detail_new.put("to_ontology_icon", atla.get("to_ontology_icon"));
atla_detail_new.put("to_ontology_label", atla.get("to_ontology_label"));
atla_detail_new.put("from_ontology_type", atla.get("from_ontology_type"));
atla_detail_new.put("from_content", from);
atla_detail_new.put("from_id", JSONPath.eval(result, from_id_value));
atla_detail_new.put("to_content", to);
atla_detail_new.put("to_id", JSONPath.eval(result, to_id_value));
atla_detail_new.put("app_code", app_code);
atla_detail_new.put("subject_id", subject_id);
System.out.println(JSON.toJSONString(atla_detail_new));
Map<Object, Object> results = new HashMap<>();
results.put("results", JSON.toJSONString(atla_detail_new));
Map<String, Map<Object, Object>> request = (Map) Config.requestMap.get(businessKey);
request.put("result", results);
KfkUtil.sendKafka(JSON.toJSONString(request));
EsExecPorcess.save(atla_detail_new, IdUtil.randomUUID());
log.info("帖子 --> 作者 推送成功:" + JSON.toJSONString(atla_detail_new));
}
if (atlas.containsKey("storyDetailPage_themePage") || atlas.containsKey("themePage_storyDetailPage")) {
log.info("有话题,关联话题");
JSONObject atla_themePage = new JSONObject();
if (atlas.containsKey("storyDetailPage_themePage")) {
atla_themePage = atlas.getJSONObject("storyDetailPage_themePage");
} else if (atlas.containsKey("themePage_storyDetailPage")) {
atla_themePage = atlas.getJSONObject("themePage_storyDetailPage");
}
String from_comtent_comment = atla_themePage.getString("from_content");
String to_content_comment = atla_themePage.getString("to_content");
String[] from_split_comment = from_comtent_comment.split(":");
String from_value_comment = from_split_comment[1];
String from_comment = (String) JSONPath.eval(result, from_value_comment);
String from_id_comment = atla_themePage.getString("from_id");
String[] from_id_split_comment = from_id_comment.split(":");
String from_id_value_comment = from_id_split_comment[1];
if (from_comment.contains(to_content_comment)) {
log.info("话题匹配到");
JSONObject atla_comment_new = new JSONObject();
atla_comment_new.put("ontology_id", atla_themePage.get("ontology_id"));
atla_comment_new.put("to_ontology_id", atla_themePage.get("to_ontology_id"));
atla_comment_new.put("from_ontology_id", atla_themePage.get("from_ontology_id"));
atla_comment_new.put("ontology_label", atla_themePage.get("ontology_label"));
atla_comment_new.put("from_ontology_label", atla_themePage.get("from_ontology_label"));
atla_comment_new.put("to_ontology_type", atla_themePage.get("to_ontology_type"));
atla_comment_new.put("from_ontology_icon", atla_themePage.get("from_ontology_icon"));
atla_comment_new.put("to_ontology_icon", atla_themePage.get("to_ontology_icon"));
atla_comment_new.put("to_ontology_label", atla_themePage.get("to_ontology_label"));
atla_comment_new.put("from_ontology_type", atla_themePage.get("from_ontology_type"));
atla_comment_new.put("from_content", from_comment);
atla_comment_new.put("from_id", JSONPath.eval(result, from_id_value_comment));
atla_comment_new.put("to_content", to_content_comment);
atla_comment_new.put("to_id", atla_themePage.get("to_id"));
atla_comment_new.put("app_code", app_code);
atla_comment_new.put("subject_id", subject_id);
System.out.println(JSON.toJSONString(atla_comment_new));
Map<Object, Object> results2 = new HashMap<>();
results2.put("results", JSON.toJSONString(atla_comment_new));
Map<String, Map<Object, Object>> request2 = (Map)Config.requestMap.get(businessKey);
request2.put("result", results2);
KfkUtil.sendKafka(JSON.toJSONString(request2));
EsExecPorcess.save(atla_comment_new, IdUtil.randomUUID());
log.info("话题关联成功 推送成功:" + JSON.toJSONString(atla_comment_new));
}
}
String postId = result.getString("postId");
log.info("遍历评论/转发");
dataList.forEach(value2 -> {
if (value2 == null || value2.equals("null")) {
return;
}
JSONObject result2 = (JSONObject) value2;
String pageType2 = result2.getString("pageType");
String postId2 = result2.getString("postId");
if (pageType2.equals("socialComment") && postId.equals(postId2)) {
JSONObject atla_comment = new JSONObject();
if (atlas.containsKey("socialComment_storyDetailPage")|| atlas.containsKey("storyDetailPage_socialComment")) {
log.info("找到此帖子评论");
if (atlas.containsKey("socialComment_storyDetailPage")) {
atla_comment = atlas.getJSONObject("socialComment_storyDetailPage");
} else if (atlas.containsKey("storyDetailPage_socialComment")) {
atla_comment = atlas.getJSONObject("storyDetailPage_socialComment");
}
String from_comtent_comment = atla_comment.getString("from_content");
String to_content_comment = atla_comment.getString("to_content");
String[] from_split_comment = from_comtent_comment.split(":");
String[] to_split_comment = to_content_comment.split(":");
String from_value_comment = from_split_comment[1];
String to_value_comment = to_split_comment[1];
String from_comment = (String) JSONPath.eval(result, from_value_comment);
String to_comment = (String) JSONPath.eval(result2, to_value_comment);
String from_id_comment = atla_comment.getString("from_id");
String to_id_comment = atla_comment.getString("to_id");
String[] from_id_split_comment = from_id_comment.split(":");
String[] to_id_split_comment = to_id_comment.split(":");
JSONObject atla_comment_new = new JSONObject();
atla_comment_new.put("ontology_id", atla_comment.get("ontology_id"));
atla_comment_new.put("to_ontology_id", atla_comment.get("to_ontology_id"));
atla_comment_new.put("from_ontology_id", atla_comment.get("from_ontology_id"));
atla_comment_new.put("ontology_label", atla_comment.get("ontology_label"));
atla_comment_new.put("from_ontology_label", atla_comment.get("from_ontology_label"));
atla_comment_new.put("to_ontology_type", atla_comment.get("to_ontology_type"));
atla_comment_new.put("from_ontology_icon", atla_comment.get("from_ontology_icon"));
atla_comment_new.put("to_ontology_icon", atla_comment.get("to_ontology_icon"));
atla_comment_new.put("to_ontology_label", atla_comment.get("to_ontology_label"));
atla_comment_new.put("from_ontology_type", atla_comment.get("from_ontology_type"));
atla_comment_new.put("from_content", from_comment);
atla_comment_new.put("from_id", JSONPath.eval(result, from_id_split_comment[1]));
atla_comment_new.put("to_content", to_comment);
atla_comment_new.put("to_id", JSONPath.eval(result2, to_id_split_comment[1]));
atla_comment_new.put("app_code", app_code);
atla_comment_new.put("subject_id", subject_id);
System.out.println(JSON.toJSONString(atla_comment_new));
Map<Object, Object> results2 = new HashMap<>();
results2.put("results", JSON.toJSONString(atla_comment_new));
Map<String, Map<Object, Object>> request2 = (Map) Config.requestMap.get(businessKey);
request2.put("result", results2);
KfkUtil.sendKafka(JSON.toJSONString(request2));
EsExecPorcess.save(atla_comment_new, IdUtil.randomUUID());
log.info("评论关联成功 推送成功:" + JSON.toJSONString(atla_comment_new));
}
} else if (pageType2.equals("socialFollow") && postId.equals(postId2)) {
if (atlas.containsKey("socialFollow_storyDetailPage") || atlas.containsKey("storyDetailPage_socialFollow")) {
log.info("找到此帖子转发");
JSONObject atla_comment = new JSONObject();
if (atlas.containsKey("socialFollow_storyDetailPage")) {
atla_comment = atlas.getJSONObject("socialFollow_storyDetailPage");
} else if (atlas.containsKey("storyDetailPage_socialFollow")) {
atla_comment = atlas.getJSONObject("storyDetailPage_socialFollow");
}
String from_comtent_comment = atla_comment.getString("from_content");
String to_content_comment = atla_comment.getString("to_content");
String[] from_split_comment = from_comtent_comment.split(":");
String[] to_split_comment = to_content_comment.split(":");
String from_value_comment = from_split_comment[1];
String to_value_comment = to_split_comment[1];
String from_comment = (String) JSONPath.eval(result, from_value_comment);
String to_comment = (String) JSONPath.eval(result2, to_value_comment);
String from_id_comment = atla_comment.getString("from_id");
String to_id_comment = atla_comment.getString("to_id");
String[] from_id_split_comment = from_id_comment.split(":");
String[] to_id_split_comment = to_id_comment.split(":");
JSONObject atla_comment_new = new JSONObject();
atla_comment_new.put("ontology_id", atla_comment.get("ontology_id"));
atla_comment_new.put("to_ontology_id", atla_comment.get("to_ontology_id"));
atla_comment_new.put("from_ontology_id", atla_comment.get("from_ontology_id"));
atla_comment_new.put("ontology_label", atla_comment.get("ontology_label"));
atla_comment_new.put("from_ontology_label", atla_comment.get("from_ontology_label"));
atla_comment_new.put("to_ontology_type", atla_comment.get("to_ontology_type"));
atla_comment_new.put("from_ontology_icon", atla_comment.get("from_ontology_icon"));
atla_comment_new.put("to_ontology_icon", atla_comment.get("to_ontology_icon"));
atla_comment_new.put("to_ontology_label", atla_comment.get("to_ontology_label"));
atla_comment_new.put("from_ontology_type", atla_comment.get("from_ontology_type"));
atla_comment_new.put("from_content", from_comment);
atla_comment_new.put("from_id", JSONPath.eval(result, from_id_split_comment[1]));
atla_comment_new.put("to_content", to_comment);
atla_comment_new.put("to_id", JSONPath.eval(result2, to_id_split_comment[1]));
atla_comment_new.put("app_code", app_code);
atla_comment_new.put("subject_id", subject_id);
System.out.println(JSON.toJSONString(atla_comment_new));
Map<Object, Object> results2 = new HashMap<>();
results2.put("results", JSON.toJSONString(atla_comment_new));
Map<String, Map<Object, Object>> request2 = (Map) Config.requestMap.get(businessKey);
request2.put("result", results2);
KfkUtil.sendKafka(JSON.toJSONString(request2));
EsExecPorcess.save(atla_comment_new, IdUtil.randomUUID());
log.info("转发关联成功,推送成功:" + JSON.toJSONString(atla_comment_new));
}
}
});
}
else if (pageType.equals("userInfoPage")) {
log.info("关联水军");
dataList.forEach(value2 -> {
if (value2 == null || value2.equals("null")) {
return;
}
JSONObject result2 = (JSONObject) value2;
String pageType2 = result2.getString("pageType");
if (pageType2.equals("userAuthenPage") && (atlas.containsKey("userInfoPage_userAuthenPage") || atlas.containsKey("userAuthenPage_userInfoPage"))) {
log.info("找到水军");
JSONObject atla_comment = new JSONObject();
if (atlas.containsKey("userInfoPage_userAuthenPage")) {
atla_comment = atlas.getJSONObject("userInfoPage_userAuthenPage");
} else if (atlas.containsKey("userAuthenPage_userInfoPage")) {
atla_comment = atlas.getJSONObject("userAuthenPage_userInfoPage");
}
String from_comtent_comment = atla_comment.getString("from_content");
String to_content_comment = atla_comment.getString("to_content");
String[] to_split_comment = to_content_comment.split(":");
String to_value_comment = to_split_comment[1];
String to_comment = (String) JSONPath.eval(result2, to_value_comment);
log.info("recognitionResult:"+to_comment);
to_comment = to_comment.replace("\\","");
String authorId = result.getString("authorId");
if (to_comment.equals("机器人")) {
//水军识别为机器人,更新用户的颜色
log.info("水军识别为机器人,更新用户的颜色");
String ontology_icon = atla_comment.getString("to_ontology_icon");
EsQueryUtil.queryOtherNewsFromEs("from_id", authorId, ontology_icon);
EsQueryUtil.queryOtherNewsFromEs("to_id", authorId, ontology_icon);
}
Map<Object, Object> results2 = new HashMap<>();
results2.put("results", JSON.toJSONString(atla_comment));
Map<String, Map<Object, Object>> request2 = (Map)Config.requestMap.get(businessKey);
request2.put("result", results2);
KfkUtil.sendKafka(JSON.toJSONString(request2));
}
});
}
}
}catch (Exception e){
e.printStackTrace();
log.error("数据组ID --> " + businessKey+":处理失败",e);
}
});
log.info("数据组ID --> " + businessKey + ":关联完成,清除缓存");
Config.dataMap.remove(businessKey);
Config.dataListMap.remove(businessKey);
Config.appMap.remove(businessKey);
}
private void assembleData() {
}
}

114
src/main/java/com/bfd/crawl_zstp/service/impl/DataProcessServiceImpl.java

@ -0,0 +1,114 @@
package com.bfd.crawl_zstp.service.impl;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bfd.crawl_zstp.service.DataCorrelationService;
import com.bfd.crawl_zstp.service.DataProcessService;
import com.bfd.crawl_zstp.util.Config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author guowei
*/
@Service
@Slf4j
public class DataProcessServiceImpl implements DataProcessService {
@Resource
DataCorrelationService dataCorrelationService;
@Override
public void process(JSONObject jsonObject) {
JSONObject data = jsonObject.getJSONObject("data");
String businessKey = data.getString("businessKey");
if(data.toJSONString().contains("userAuthenPage")){
log.info("水军请求,修改数据组ID");
businessKey = IdUtil.randomUUID() + "_user";
}
if(data.toJSONString().contains("fakeNewsPage")){
log.info("真假新闻请求,修改数据组ID");
businessKey = IdUtil.randomUUID() + "_fakeNews";
}
log.info("数据组ID --> " + businessKey + ";data -->" + jsonObject);
if (!Config.dataMap.containsKey(businessKey)) {
Config.dataMap.put(businessKey, new HashMap<>());
}
//获取每组数据的Id
// if (!Config.dataMap.containsKey(businessKey)){
JSONObject input = jsonObject.getJSONObject("input");
JSONArray atlas = input.getJSONArray("atlas");
String app_code = jsonObject.getString("app_code");
// Map atla = new HashMap();
String finalBusinessKey = businessKey;
atlas.forEach(value -> {
try {
JSONObject item = (JSONObject) JSON.toJSON(value);
System.out.println(item);
System.out.println(finalBusinessKey);
// JSONObject item = (JSONObject) value;
String from_ontology_id = item.getString("from_ontology_id");
String to_ontology_id = item.getString("to_ontology_id");
String key = from_ontology_id + "_" + to_ontology_id;
Map atla = (Map) Config.dataMap.get(finalBusinessKey);
if (!atla.containsKey(key)) {
atla.put(key, item);
Config.dataMap.put(finalBusinessKey, atla);
}
}catch (Exception e){
log.error("处理atlas失败,value --> "+value);
}
});
if (!Config.dataListMap.containsKey(finalBusinessKey)) {
Config.dataListMap.put(finalBusinessKey, new ArrayList<>());
}
if (!Config.appMap.containsKey(finalBusinessKey)) {
// Config.appMap.put(finalBusinessKey, jsonObject.getString("app_code"));
String subject_id = jsonObject.getString("source_data_id");
Map appmap = new HashMap<>();
appmap.put("app_code",app_code);
appmap.put("subject_id",subject_id);
Config.appMap.put(finalBusinessKey,appmap);
}
// }
// 获取所有键
for (String key : data.keySet()) {
// System.out.println(key);
// 检查是否key为UUID
if (key.matches("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")) {
String uuidValue = data.getString(key);
System.out.println("Key: " + key + ", Value: " + uuidValue);
List dataList = (List) Config.dataListMap.get(finalBusinessKey);
JSONObject parseObject = JSONObject.parseObject(uuidValue);
parseObject.put("app_code",app_code);
if (parseObject==null || parseObject.equals("null")){
log.error("data为空 --> " + uuidValue);
continue;
}
dataList.add(parseObject);
Config.dataListMap.put(finalBusinessKey, dataList);
Config.requestMap.put(finalBusinessKey,jsonObject);
}
}
//如果是结束标识这组数据可以进行关联
if (data.containsKey("isLast")) {
log.info("数据组ID --> " + finalBusinessKey + ",结束调用,开始关联处理");
dataCorrelationService.correlationData(finalBusinessKey);
}
}
}

37
src/main/java/com/bfd/crawl_zstp/util/AsyncConfig.java

@ -0,0 +1,37 @@
package com.bfd.crawl_zstp.util;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync //Java配置文件标注它那么Spring就会开启异步可用
/**
* @author guowei
* 异步任务线程池
* 注解@EnableAsync代表开启Spring异步这样就可以使用@Async驱动Spring使用异步
* 但是异步需要提供可用线程池所以这里的配置类还会实现AsyncConfigurer接口然后覆盖getAsyncExecutor方法这样就可以自定义一个线程池
*/
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
//定义线程池
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
threadPoolTaskExecutor.setCorePoolSize(10);
//线程池最大线程数
threadPoolTaskExecutor.setMaxPoolSize(50);
//线程队列最大线程数
threadPoolTaskExecutor.setQueueCapacity(200);
//初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}

32
src/main/java/com/bfd/crawl_zstp/util/Config.java

@ -0,0 +1,32 @@
package com.bfd.crawl_zstp.util;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author guowei
*/
@Component
public class Config {
/**
* 每组数据对应关系
*/
public static Map dataMap = new HashMap<>();
/**
* 每组数据 list
*/
public static Map dataListMap = new HashMap<>();
public static Map appMap = new HashMap<>();
/**
* 数据匹配队列
*/
public static LinkedBlockingQueue<Map> dataQueue = new LinkedBlockingQueue<>();
public static Map requestMap = new HashMap();
}

153
src/main/java/com/bfd/crawl_zstp/util/Constants.java

@ -0,0 +1,153 @@
package com.bfd.crawl_zstp.util;
/**
* 常量实体类
* @author jian.mao
* @date 2022年11月15日
* @description
*/
public class Constants {
/*************************蓝图常量key名称*********************************/
public final static String SCHEDULING = "scheduling";
public final static String TYPE = "type";
public final static String INTERVAL = "interval";
public final static String CREATED = "created";
public final static String LAST_EDIT = "last_edit";
public final static String BLUEPRINT_ID = "blueprint_id";
public final static String BLUEPRINTID = "blueprintId";
public final static String BLUEPRINT_NAME = "name";
public final static String SCENARIO = "scenario";
public final static String AUTOCOMMITTRIGGERLAST = "autoCommitTriggerLast";
public final static String FRESHVARIABLES = "freshVariables";
public final static String AUTOCOMMIT = "autoCommit";
public final static String MAXERRORS = "maxErrors";
public final static String DATALOSS = "dataloss";
public final static String POSITION = "position";
public final static String SCENES_ID = "scenes_id";
public final static String SCENESID = "scenesId";
public final static String MULTI_BRANCH = "multi_branch";
public final static String SINGLE = "single";
/**已重试次数**/
public final static String ERROR_TIME = "error_time";
public final static String PREVIOUS_RESULT = "previous_result";
/****数据id*****/
public final static String BUSINESSKEY = "businessKey";
/*************************metadata常量key名称*********************************/
public final static String LABEL_COL = "label_col";
public final static String LABEL = "label";
public final static String USER = "user";
public final static String ADMIN = "admin";
public final static String ADDRESS = "address";
public final static String DATASOURCE = "datasource";
public final static String INDEX = "index";
/*************************app常量key名称*********************************/
public final static String APPS = "apps";
public final static String TRANSFER_ID = "transfer_id";
public final static String MODULE = "module";
public final static String VERSION = "version";
public final static String METADATA = "metadata";
public final static String APP_NAME = "name";
public final static String DESCRIBE = "describe";
public final static String NEXT_APP_ID = "next_app_id";
public final static String EDGE_ID = "edge_id";
public final static String START_ID = "start_id";
public final static String END_ID = "end_id";
public final static String WAIT_CONDITION = "wait_condition";
public final static String START_TAG = "start_tag";
/*************************module类型*********************************/
public final static String FILE = "file";
public final static String OCR = "OCR";
public final static String FILTER = "Filter";
public final static String CHATGPT = "ChatGPT";
public final static String MYSQL = "mysql";
/*************************other类型*********************************/
public final static String UNDERLINE = "_";
public final static String RESULT_TOPIC = null;
public static final String EMPTY = "";
public static final String HTTP = "http";
public static final String REQUEST_ERROR_MESSAGE = "Download failed error is";
public static final String REQUEST_RESULT = "result";
public static final String REQUEST_RESULT_RESULTS = "results";
public static final String MAP_TYPE = "Map";
public static final String LIST_TYPE = "List";
public static final String STRING_TYPE = "String";
public static final String DOCUMENT_TYPE = "doc";
public static final String FILTER_ZH = "过滤器";
public static final String JSON_SELE_SYMBOL = "$.";
public static final String LEFT_BRACKETS = "[";
public static final String RIGTH_BRACKETS = "]";
public static final String TASKTYPE = "taskType";
public static final Integer USER_TYPE = 1;
public static final Integer KEYWORD_TYPE = 0;
public static final Integer DETAIL_TYPE = 2;
public static final String CID = "cid";
public static final String SITETYPE = "siteType";
public static final Integer DEFULT_SUBJECTID = 304864;
public static final Integer DEFULT_CRAWLCYCLICITYTIME = 1440;
public static final String CRAWLENDTIME = "crawlEndTime";
public static final String CRAWLSTARTTIME = "crawlStartTime";
public static final String CRAWLPAGETYPES = "crawlPageTypes";
public static final String APPID = "113ic";
public static final String APP_ID = "appId";
public final static String ID = "id";
public static final Integer DEFULT_CRAWLPERIODHOUR = 24;
public static final String CREATEUSERID = "662015832180933762";
public static final String CRAWL_ADD_URL = "https://caiji.percent.cn/api/crawl/remote/task/save";
public static final String CRAWLKEYWORD = "crawlKeyword";
public static final String ATTACHTAG = "attachTag";
public static final String ATTACHTAG_VALUE = "analyze";
public static final String KEYWORD = "keyword";
public static final String SITEID = "siteId";
public static final String RESULTS = "results";
public static final String RESULT = "result";
public static final String CRAWLDATAFLAG = "crawlDataFlag";
public static final String CRAWLDATAFLAG_PREFIX = "\"crawlDataFlag\":\"keyword:";
public static final String TID = "tid";
public static final Long TIME_OUT = 1800000L;
public static final String ATTR = "attr";
public static final String HASVIDEO = "hasVideo";
public static final String CRAWL_END_MARK = "crawl_end_mark";
public static final String CRAWL_END_MESSAGE = "crawl_end_message";
public static final String CRAWL_END_MESSAGE_VALUE = "数据采集完成";
public static final String SUBJECTID = "subjectId";
public static final String TASKID = "taskId";
public static final int SUCCESS_CODE = 200;
public static final String WEB_URL_SUFFIX = "/api/aogeo/api/cda/caiji/status";
public static final String STATUS = "status";
/************************redis*************************************/
public static final String LOCK_KEY = "myLock";
public static final long LOCK_EXPIRE_TIME = 300000;
/************************应用参数*************************************/
public static final String CODE = "code";
public static final String MESSAGE = "message";
public static final String INPUT = "input";
public static final String OUTPUT = "output";
public static final String FORM = "form";
public static final String FIELD = "field";
public static final String VALUE = "value";
public static final String DATA = "data";
public static final String COLON_EN = ":";
public static final String DATABASE = "database";
public static final String TABLE = "table";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String PORT = "port";
public static final String HOSTNAME = "hostname";
public static final String DATATYPE = "dataType";
public static final String RULES = "rules";
public static final String GENID = "genId";
public static final String KEY = "key";
public static final String DATAID = "dataId";
}

58
src/main/java/com/bfd/crawl_zstp/util/ESClientFactory.java

@ -0,0 +1,58 @@
package com.bfd.crawl_zstp.util;
import org.apache.http.HttpHost;
import org.apache.http.client.CredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClientBuilder;
public class ESClientFactory {
private static final int CONNECT_TIME_OUT = 1000;
private static final int SOCKET_TIME_OUT = 30000;
private static final int CONNECTION_REQUEST_TIME_OUT = 500;
private static final int MAX_CONNECT_NUM = 100;
private static final int MAX_CONNECT_PER_ROUTE = 100;
private static boolean uniqueConnectTimeConfig = false;
private static boolean uniqueConnectNumConfig = true;
private static final String HOST = "localhost";
private static final int PORT = 9200;
private static final String SCHEME = "http";
private static final String USERNAME = "elastic"; // 替换为你的 Elasticsearch 用户名
private static final String PASSWORD = "baifendian"; // 替换为你的 Elasticsearch 密码
public static RestHighLevelClient init() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(USERNAME, PASSWORD));
RestClientBuilder builder = RestClient.builder(new HttpHost("172.18.1.147", 9200, "http"))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider));
return new RestHighLevelClient(builder);
// RestClientBuilder builder = RestClient.builder(new HttpHost(HOST, PORT, SCHEME))
// .setRequestConfigCallback(getRequestConfigCallback())
// .setHttpClientConfigCallback(getHttpClientConfigCallback());
//
// RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
// return restHighLevelClient;
}
private static RestClientBuilder.RequestConfigCallback getRequestConfigCallback() {
if (uniqueConnectTimeConfig) {
// 根据需要设置连接超时配置
return requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(5000);
}
return null;
}
}

87
src/main/java/com/bfd/crawl_zstp/util/EsExecPorcess.java

@ -0,0 +1,87 @@
package com.bfd.crawl_zstp.util;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @PROJECT_NAME: result_assembly
* @DESCRIPTION:
* @AUTHOR: jian.mao
* @DATE: 2023/11/1 17:06
*/
@Component
@Slf4j
public class EsExecPorcess {
/**
* 写入es方法
* @param data 数据源
* retrun 文档结构
*/
public static Map<String, Object> save(Map<String, Object> data,String dataId){
// 创建凭据提供者
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "baifendian"));
// 创建一个 HttpClient设置凭据提供者
CloseableHttpClient httpClient = HttpClients.custom()
.setDefaultCredentialsProvider(credentialsProvider)
.build();
try {
StringBuffer url = new StringBuffer("http://");
url.append("172.18.1.147");
url.append(":");
url.append("9200");
url.append("/");
url.append("youzhi_cda_kg_relation");
url.append("/_doc/");
url.append(dataId);
HttpPost httpPost = new HttpPost(url.toString());
StringEntity entity = new StringEntity(JSONObject.toJSONString(data), ContentType.APPLICATION_JSON);
httpPost.setEntity(entity);
// 发送请求并获取响应
HttpResponse response = httpClient.execute(httpPost);
// 处理响应
int statusCode = response.getStatusLine().getStatusCode();
String responseBody = EntityUtils.toString(response.getEntity());
int code = 201;
int updateCode = 200;
if (statusCode == code) {
System.out.println("文档写入成功");
log.info("文档数据写入成功:{}",responseBody);
} else if(statusCode == updateCode) {
log.info("文档已存在并更新成功:{}",responseBody);
}else{
log.error("文档写入失败:{}",responseBody);
}
} catch (Exception e) {
log.error("文档写入异常:",e);
}finally{
try {
httpClient.close();
} catch (IOException e) {
log.error("关闭httpclient连接对象异常,",e);
}
}
return data;
}
}

123
src/main/java/com/bfd/crawl_zstp/util/EsQueryUtil.java

@ -0,0 +1,123 @@
package com.bfd.crawl_zstp.util;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.Map;
/**
* @author guowei
*/
@Component
@Slf4j
public class EsQueryUtil {
private static RestHighLevelClient restClient = ESClientFactory.init();
@SneakyThrows
public static void queryOtherNewsFromEs(String type, String authorId,String to_ontology_icon) {
try {
log.info("查询ES,type:"+type+",Id:"+authorId+",ontology_icon:"+to_ontology_icon);
//记录上次请求数据保留的时间 设定滚动时间间隔
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L));
SearchRequest searchRequest = new SearchRequest("youzhi_cda_kg_relation");
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//设定每次返回多少条数据
searchSourceBuilder.size(1000);
//获取当前时间5分钟前时间
Calendar myTime = Calendar.getInstance();
// searchSourceBuilder.query(QueryBuilders.termsQuery("dataId", "97e958075bc31b3cdadf5d8fc827fa93"));
searchSourceBuilder.query(QueryBuilders.matchQuery(type, authorId));
//end
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = null;
int chonsCount = 1;
while (true) {
if (chonsCount <= 5) {
try {
searchResponse = restClient.search(searchRequest);
break;
} catch (Exception e) {
e.printStackTrace();
System.out.println("查询有错误,休息一会儿,重试");
Thread.sleep(5000);
}
} else {
break;
}
chonsCount++;
}
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
System.out.println("查询结果:" + searchHits.length);
int index = 0;
//处理数据
getIndex(searchHits, type,to_ontology_icon);
if (searchHits.length == 1000) {
// int num=0;
//遍历搜索命中的数据直到没有数据
while (searchHits != null && searchHits.length > 0 ) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = restClient.searchScroll(scrollRequest);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
if (searchHits != null && searchHits.length > 0) {
log.info("--- 下一页-----专题数据长度" + searchHits.length);
//处理数据
getIndex(searchHits, type,to_ontology_icon);
// num++;
}
}
}
Thread.sleep(5000);
//清除滚屏
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);//也可以选择setScrollIds()将多个scrollId一起使用
ClearScrollResponse clearScrollResponse = null;
clearScrollResponse = restClient.clearScroll(clearScrollRequest);
boolean succeeded = clearScrollResponse.isSucceeded();
log.info("ID:"+authorId+"->----查询任务结束,length:" + searchHits.length);
}catch (Exception e){
e.printStackTrace();
}
}
private static void getIndex(SearchHit[] searchHits, String type,String ontology_icon) {
for (SearchHit searchHit : searchHits) {
System.out.println(searchHit);
String id = searchHit.getId();
System.out.println(id);
Map<String, Object> obj = searchHit.getSourceAsMap();
System.out.println(obj);
switch (type){
case "from_id":
obj.put("from_ontology_icon",ontology_icon);
break;
case "to_id":
obj.put("to_ontology_icon",ontology_icon);
break;
default:
break;
}
System.out.println("new:"+obj);
EsExecPorcess.save(obj,id);
}
}
public static void main(String[] args) {
queryOtherNewsFromEs("to_id","782640277","#FF6655");
}
}

81
src/main/java/com/bfd/crawl_zstp/util/KfkUtil.java

@ -0,0 +1,81 @@
package com.bfd.crawl_zstp.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
/**
* @author guowei
* kfk工具类
*/
@Component
@Slf4j
public class KfkUtil {
private static String topic;
private static String brokerList;
@Value("${crawl.kafka.topic}")
public void setTopic(String topic) {
KfkUtil.topic = topic;
}
@Value("${crawl.kafka.brokers}")
public void setBrokerList(String brokerList) {
KfkUtil.brokerList = brokerList;
}
private static KafkaProducer<String, String> kafkaProducer;
public static int num = 0;
/**
* 获取KafkaProducer实例
*/
public static KafkaProducer<String, String> getProducer() {
if (kafkaProducer == null) {
Properties props = new Properties();
//xxx服务器ip
props.put("bootstrap.servers", brokerList);
//所有follower都响应了才认为消息提交成功"committed"
props.put("acks", "all");
//retries = MAX 无限重试直到你意识到出现了问题:)
props.put("retries", 3);
//producer将试图批处理消息记录以减少请求次数.默认的批量处理消息字节数
props.put("batch.size", 16384);
//batch.size当批量的数据大小达到设定值后就会立即发送不顾下面的linger.ms
//延迟1ms发送这项设置将通过增加小的延迟来完成--不是立即发送一条记录producer将会等待给定的延迟时间以允许其他消息记录发送这些消息记录可以批量处理
props.put("linger.ms", 1);
//producer可以用来缓存数据的内存大小
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
/**
* 关闭KafkaProducer实例
*/
public static void closeProducer() {
if (kafkaProducer != null) {
log.info("----------close producer----------");
kafkaProducer.close();
kafkaProducer = null;
}
}
public static void sendKafka(String resultData) {
KafkaProducer<String, String> producer = getProducer();
ProducerRecord<String, String> se = new ProducerRecord<String, String>(topic, resultData);
producer.send(se);
log.info("发送kafka成功");
// num++;
}
}

36
src/main/resources/application.yml

@ -0,0 +1,36 @@
crawl:
kafka:
topic: analyze
brokers: 172.16.12.55:9092,172.16.12.56:9092,172.16.12.57:9092
task:
taskData: ./data/taskData.txt
server:
port: 9999
#日志级别
logging:
level:
com:
bfd: INFO
#日志路径
log:
path: ./logs
spring:
boot:
admin:
client:
url: http://172.16.12.55:8001
# instance:
# service-base-url: http://10.10.114.17:9999
application:
name: zstp
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
health:
elasticsearch:
enabled: false

38
src/main/resources/logback-spring.xml

@ -0,0 +1,38 @@
<configuration>
<!-- 属性文件:在properties文件中找到对应的配置项 -->
<springProperty scope="context" name="logging.path" source="logging.log.path"/>
<springProperty scope="context" name="logging.level" source="logging.level.com.bfd"/>
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 -->
<!-- <appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d{HH:mm:ss.SSS} %-5level %logger{80} - %msg%n</Pattern>
</encoder>
</appender> -->
<appender name="GLMAPPER-LOGGERONE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${logging.level}</level>
</filter>
<file>
${logging.path}/crawlSchedule.log
<!-- ${logging.path}/sendKafka.log -->
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logging.path}/crawlSchedule.log.%d{yyyy-MM-dd}</FileNamePattern>
<!-- <FileNamePattern>${logging.path}/sendKafka.log.%d{yyyy-MM-dd}</FileNamePattern> -->
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="GLMAPPER-LOGGERONE"/>
<!-- <appender-ref ref="STDOUT"/> -->
</root>
</configuration>
Loading…
Cancel
Save