Browse Source

词云应用

master
55007 6 months ago
commit
2156c5593e
  1. 40
      .classpath
  2. 2
      .gitignore
  3. 23
      .project
  4. 5
      .settings/org.eclipse.core.resources.prefs
  5. 9
      .settings/org.eclipse.jdt.core.prefs
  6. 4
      .settings/org.eclipse.m2e.core.prefs
  7. 198
      pom.xml
  8. 16
      src/main/java/com/bfd/wordcloud/WordCloudApplication.java
  9. 37
      src/main/java/com/bfd/wordcloud/controller/ApiController.java
  10. 13
      src/main/java/com/bfd/wordcloud/service/ProcessService.java
  11. 81
      src/main/java/com/bfd/wordcloud/service/impl/ProcessServiceImpl.java
  12. 22
      src/main/java/com/bfd/wordcloud/util/Constants.java
  13. 127
      src/main/java/com/bfd/wordcloud/util/EsExecPorcess.java
  14. 83
      src/main/java/com/bfd/wordcloud/util/KfkUtil.java
  15. 27
      src/main/java/com/bfd/wordcloud/util/Utils.java
  16. 25
      src/main/java/com/bfd/wordcloud/util/ZookeeperConfig.java
  17. 58
      src/main/java/com/bfd/wordcloud/util/ZookeeperNodeMonitor.java
  18. 37
      src/main/resources/application.yml
  19. 38
      src/main/resources/logback-spring.xml
  20. 13
      src/test/java/com/bfd/wordcloud/WordCloudApplicationTests.java

40
.classpath

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

2
.gitignore

@ -0,0 +1,2 @@
/target/
/logs/

23
.project

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>word_cloud</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

5
.settings/org.eclipse.core.resources.prefs

@ -0,0 +1,5 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8

9
.settings/org.eclipse.jdt.core.prefs

@ -0,0 +1,9 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.methodParameters=generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8

4
.settings/org.eclipse.m2e.core.prefs

@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

198
pom.xml

@ -0,0 +1,198 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bfd</groupId>
<artifactId>word_cloud</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>word_cloud</name>
<description>word_cloud</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.0.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.12</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.9</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/de.codecentric/spring-boot-admin-starter-client -->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-client</artifactId>
<version>2.2.4</version>
</dependency>
<!--redis -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<!--不打入jar包的文件类型或者路径-->
<excludes>
<exclude>*.properties</exclude>
<exclude>*.yml</exclude>
<exclude>*.yaml</exclude>
</excludes>
<archive>
<manifest>
<!-- 执行的主程序路径 -->
<mainClass>com.bfd.wordcloud.WordCloudApplication</mainClass>
<!--是否要把第三方jar放到manifest的classpath中-->
<addClasspath>true</addClasspath>
<!--生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/-->
<classpathPrefix>lib/</classpathPrefix>
<!-- 打包时 MANIFEST.MF 文件不记录的时间戳版本 -->
<useUniqueVersions>false</useUniqueVersions>
</manifest>
<manifestEntries>
<!-- 在 Class-Path 下添加配置文件的路径 -->
<Class-Path>lib/pauseTool-1.0.jar config/</Class-Path>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<resources>
<!--把配置文件打包到指定路径-->
<resource>
<directory>src/main/resources/</directory>
<includes>
<include>*.properties</include>
<include>*.yml</include>
<exclude>*.yaml</exclude>
</includes>
</resource>
</resources>
<outputDirectory>${project.build.directory}/config</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

16
src/main/java/com/bfd/wordcloud/WordCloudApplication.java

@ -0,0 +1,16 @@
package com.bfd.wordcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author guowei
*/
@SpringBootApplication
public class WordCloudApplication {
public static void main(String[] args) {
SpringApplication.run(WordCloudApplication.class, args);
}
}

37
src/main/java/com/bfd/wordcloud/controller/ApiController.java

@ -0,0 +1,37 @@
package com.bfd.wordcloud.controller;
import com.alibaba.fastjson2.JSONObject;
import com.bfd.wordcloud.service.ProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* @author guowei
*/
@RestController
@Slf4j
@RequestMapping(value = "/word_cloud")
@CrossOrigin(origins = "*", maxAge = 3600)
public class ApiController {
@Resource
ProcessService processService;
/**
* 视频上传 Api
* @param jsonObject
* @return
*/
@RequestMapping(value = "/process", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
public String varAna(@RequestBody JSONObject jsonObject) {
log.info("词云请求参数:"+jsonObject);
processService.process(jsonObject);
return "success";
}
}

13
src/main/java/com/bfd/wordcloud/service/ProcessService.java

@ -0,0 +1,13 @@
package com.bfd.wordcloud.service;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.stereotype.Service;
/**
* @author guowei
*/
@Service
public interface ProcessService {
void process(JSONObject jsonObject);
}

81
src/main/java/com/bfd/wordcloud/service/impl/ProcessServiceImpl.java

@ -0,0 +1,81 @@
package com.bfd.wordcloud.service.impl;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.bfd.wordcloud.service.ProcessService;
import com.bfd.wordcloud.util.EsExecPorcess;
import com.bfd.wordcloud.util.KfkUtil;
import com.bfd.wordcloud.util.Utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* @author guowei
*/
@Service
@Slf4j
public class ProcessServiceImpl implements ProcessService {
@Override
public void process(JSONObject jsonObject) {
//输入
JSONObject input = jsonObject.getJSONObject("input");
//输出
JSONObject output = jsonObject.getJSONObject("output");
//data
JSONObject data = jsonObject.getJSONObject("data");
System.out.println("queryData ---> input:" + JSON.toJSONString(input));
System.out.println("queryData ---> output:" + JSON.toJSONString(output));
System.out.println("queryData ---> data:" + JSON.toJSONString(data));
Map results = new HashMap<>(16);
Map outMap = new HashMap<>(16);
try {
String field = input.getString("field");
// Integer top = input.getInteger("top");
// Integer type = input.getInteger("type");
// JSONArray color = input.getJSONArray("color");
Object key = Utils.jsonParse(field, data);
if ( key==null){
throw new NullPointerException();
}
if (key instanceof String){
key = new ArrayList<>().add(key);
}
Map resultMap = new HashMap<>(16);
resultMap.put("app_code", jsonObject.getString("app_code"));
resultMap.put("app_id", jsonObject.getString("id"));
resultMap.put("business_key", jsonObject.getString("business_key"));
resultMap.put("scenes_id", jsonObject.getString("scenes_id"));
resultMap.put("data", key);
EsExecPorcess.save(resultMap, IdUtil.randomUUID());
results.put("status", 1);
results.put("message", "成功");
outMap.put("content", JSON.toJSONString(resultMap));
} catch (Exception e) {
e.printStackTrace();
results.put("status", 2);
results.put("message", "失败");
outMap.put("content", "失败");
}
outMap.put("isLast", 1);
results.put("results", JSON.toJSONString(outMap));
jsonObject.put("result", results);
KfkUtil.sendKafka(JSON.toJSONString(jsonObject));
log.info("处理完成,result:" + JSON.toJSONString(results));
}
}

22
src/main/java/com/bfd/wordcloud/util/Constants.java

@ -0,0 +1,22 @@
package com.bfd.wordcloud.util;
import org.springframework.stereotype.Component;
/**
* @author guowei
*/
@Component
public class Constants {
public final static String STOP = "stop";
public final static String START = "start";
public final static String DELETE = "delete";
public final static String OR = "or";
public final static String SELECT = "select";
public final static String OPERATION = "operation";
}

127
src/main/java/com/bfd/wordcloud/util/EsExecPorcess.java

@ -0,0 +1,127 @@
package com.bfd.wordcloud.util;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* @PROJECT_NAME: result_assembly
* @DESCRIPTION:
* @AUTHOR: jian.mao
* @DATE: 2023/11/1 17:06
*/
@Component
@Slf4j
public class EsExecPorcess {
/**
* 写入es方法
* @param data 数据源
* retrun 文档结构
*/
public static Map<String, Object> save(Map<String, Object> data,String dataId){
// 创建凭据提供者
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "baifendian"));
// 创建一个 HttpClient设置凭据提供者
CloseableHttpClient httpClient = HttpClients.custom()
.setDefaultCredentialsProvider(credentialsProvider)
.build();
try {
StringBuffer url = new StringBuffer("http://");
url.append("172.18.1.147");
url.append(":");
url.append("9200");
url.append("/");
url.append("youzhi_cda_word_cloud");
url.append("/_doc/");
url.append(dataId);
HttpPost httpPost = new HttpPost(url.toString());
StringEntity entity = new StringEntity(JSONObject.toJSONString(data), ContentType.APPLICATION_JSON);
httpPost.setEntity(entity);
// 发送请求并获取响应
HttpResponse response = httpClient.execute(httpPost);
// 处理响应
int statusCode = response.getStatusLine().getStatusCode();
String responseBody = EntityUtils.toString(response.getEntity());
int code = 201;
int updateCode = 200;
if (statusCode == code) {
System.out.println("文档写入成功");
log.info("文档数据写入成功:{}",responseBody);
} else if(statusCode == updateCode) {
log.info("文档已存在并更新成功:{}",responseBody);
}else{
log.error("文档写入失败:{}",responseBody);
}
} catch (Exception e) {
log.error("文档写入异常:",e);
}finally{
try {
httpClient.close();
} catch (IOException e) {
log.error("关闭httpclient连接对象异常,",e);
}
}
return data;
}
public static void deleteDataBycondition(String conditionField, Object conditionValue){
// 创建凭据提供者
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "baifendian"));
// 创建一个 HttpClient设置凭据提供者
try (CloseableHttpClient httpClient = HttpClients.custom()
.setDefaultCredentialsProvider(credentialsProvider)
.build()) {
// 构造请求 URL
StringBuffer host = new StringBuffer("http://");
host.append("172.18.1.147");
host.append(":");
host.append("9200");
host.append("/");
host.append("youzhi_cda_word_cloud");
host.append("/_delete_by_query");
// 构造请求体
// String requestBody = String.format("{ \"query\": { \"match\": { \"%s\": \"%s\" }}}",conditionField, conditionValue);
//删除条件 添加version
String requestBody = String.format("{\"query\":{\"bool\":{\"must\":[{\"match\":{\"%s\":\"%s\"}}]}}}",conditionField, conditionValue);
// 创建 HTTP POST 请求
HttpPost httpPost = new HttpPost(host.toString());
httpPost.setHeader("Content-Type", "application/json");
httpPost.setEntity(new StringEntity(requestBody));
// 发送请求并获取响应
HttpResponse response = httpClient.execute(httpPost);
// 解析响应
HttpEntity entity = response.getEntity();
if (entity != null) {
String responseString = EntityUtils.toString(entity);
log.info("Response: " + responseString);
} else {
log.info("No response received");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

83
src/main/java/com/bfd/wordcloud/util/KfkUtil.java

@ -0,0 +1,83 @@
package com.bfd.wordcloud.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
/**
* @author guowei
* kfk工具类
*/
@Component
@Slf4j
public class KfkUtil {
private static String topic;
private static String brokerList;
@Value("${crawl.kafka.topic}")
public void setTopic(String topic) {
KfkUtil.topic = topic;
}
@Value("${crawl.kafka.brokers}")
public void setBrokerList(String brokerList) {
KfkUtil.brokerList = brokerList;
}
private static KafkaProducer<String, String> kafkaProducer;
public static int num = 0;
/**
* 获取KafkaProducer实例
*/
public static KafkaProducer<String, String> getProducer() {
// synchronized (kafkaProducer) {
if (kafkaProducer == null) {
Properties props = new Properties();
//xxx服务器ip
props.put("bootstrap.servers", brokerList);
//所有follower都响应了才认为消息提交成功"committed"
props.put("acks", "all");
//retries = MAX 无限重试直到你意识到出现了问题:)
props.put("retries", 3);
//producer将试图批处理消息记录以减少请求次数.默认的批量处理消息字节数
props.put("batch.size", 16384);
//batch.size当批量的数据大小达到设定值后就会立即发送不顾下面的linger.ms
//延迟1ms发送这项设置将通过增加小的延迟来完成--不是立即发送一条记录producer将会等待给定的延迟时间以允许其他消息记录发送这些消息记录可以批量处理
props.put("linger.ms", 1);
//producer可以用来缓存数据的内存大小
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(props);
}
// }
return kafkaProducer;
}
/**
* 关闭KafkaProducer实例
*/
public static void closeProducer() {
if (kafkaProducer != null) {
log.info("----------close producer----------");
kafkaProducer.close();
kafkaProducer = null;
}
}
public static void sendKafka(String resultData) {
KafkaProducer<String, String> producer = getProducer();
ProducerRecord<String, String> se = new ProducerRecord<String, String>(topic, resultData);
producer.send(se);
log.info("发送kafka成功");
// num++;
}
}

27
src/main/java/com/bfd/wordcloud/util/Utils.java

@ -0,0 +1,27 @@
package com.bfd.wordcloud.util;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author guowei
*/
@Component
public class Utils {
public static Object jsonParse(String key, Map data) {
String[] keySplit = key.split(":");
String jsonPath = keySplit[1];
if (!data.containsKey(keySplit[0])){
return "";
}
String dataJson = (String) data.get(keySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
Object dataValue = JSONPath.eval(dataJsonObject, jsonPath);
return dataValue;
}
}

25
src/main/java/com/bfd/wordcloud/util/ZookeeperConfig.java

@ -0,0 +1,25 @@
package com.bfd.wordcloud.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author jian.mao
* @date 2024年4月16日
* @description
*/
@Configuration
public class ZookeeperConfig {
@Value("${zookeeper.connection-string}")
private String connectionString;
@Bean
public CuratorFramework curatorFramework() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
return curatorFramework;
}
}

58
src/main/java/com/bfd/wordcloud/util/ZookeeperNodeMonitor.java

@ -0,0 +1,58 @@
package com.bfd.wordcloud.util;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author jian.mao
* @date 2024年4月17日
* @description
*/
@Component
@Slf4j
public class ZookeeperNodeMonitor {
@Autowired
private CuratorFramework curatorFramework;
@Value("${zookeeper.publish-node}")
private String nodePath;
@PostConstruct
public void init() {
try {
// 创建节点监听器
NodeCache nodeCache = new NodeCache(curatorFramework, nodePath);
nodeCache.start(true);
// 监听节点变化
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData();
String nodeData = new String(data);
System.out.println("Node data changed: " + nodeData);
log.info("Node data changed: " + nodeData);
JSONObject zkData = JSONObject.parseObject(nodeData);
if(zkData.getString(Constants.OPERATION).equals(Constants.START) || zkData.getString(Constants.OPERATION).equals(Constants.DELETE)){
log.info("触发开启,尝试删除词云ES数据");
Integer scenes_id = zkData.getInteger("scenes_id");
// 在这里处理节点数据变化的逻辑比如通知其他组件或者执行相应的操作
EsExecPorcess.deleteDataBycondition("scenes_id",scenes_id);
}
}
});
} catch (Exception e) {
e.printStackTrace();
// 异常处理
}
}
}

37
src/main/resources/application.yml

@ -0,0 +1,37 @@
crawl:
kafka:
topic: produce_analyze
brokers: 172.18.1.146:9092,172.18.1.147:9092,172.18.1.148:9092
server:
port: 9022
#日志级别
logging:
level:
com:
bfd: INFO
#日志路径
log:
path: ./logs
spring:
boot:
admin:
client:
url: http://172.18.1.147:8001
instance:
service-base-url: http://172.18.1.147:9999
application:
name: 词云
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
health:
elasticsearch:
enabled: false
zookeeper:
connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181
publish-node: /analyze

38
src/main/resources/logback-spring.xml

@ -0,0 +1,38 @@
<configuration>
<!-- 属性文件:在properties文件中找到对应的配置项 -->
<springProperty scope="context" name="logging.path" source="logging.log.path"/>
<springProperty scope="context" name="logging.level" source="logging.level.com.bfd"/>
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 -->
<!-- <appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d{HH:mm:ss.SSS} %-5level %logger{80} - %msg%n</Pattern>
</encoder>
</appender> -->
<appender name="GLMAPPER-LOGGERONE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${logging.level}</level>
</filter>
<file>
${logging.path}/crawlSchedule.log
<!-- ${logging.path}/sendKafka.log -->
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logging.path}/crawlSchedule.log.%d{yyyy-MM-dd}</FileNamePattern>
<!-- <FileNamePattern>${logging.path}/sendKafka.log.%d{yyyy-MM-dd}</FileNamePattern> -->
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="GLMAPPER-LOGGERONE"/>
<!-- <appender-ref ref="STDOUT"/> -->
</root>
</configuration>

13
src/test/java/com/bfd/wordcloud/WordCloudApplicationTests.java

@ -0,0 +1,13 @@
package com.bfd.wordcloud;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class WordCloudApplicationTests {
@Test
void contextLoads() {
}
}
Loading…
Cancel
Save