Browse Source

api客户端、服务端应用

master
55007 6 months ago
commit
75ffe89ddb
  1. 40
      .classpath
  2. 2
      .gitignore
  3. 23
      .project
  4. 5
      .settings/org.eclipse.core.resources.prefs
  5. 9
      .settings/org.eclipse.jdt.core.prefs
  6. 4
      .settings/org.eclipse.m2e.core.prefs
  7. 1
      README.md
  8. 282
      pom.xml
  9. 24
      src/main/java/com/bfd/app/Application.java
  10. 39
      src/main/java/com/bfd/app/cache/ConfigCache.java
  11. 65
      src/main/java/com/bfd/app/cache/LocalCache.java
  12. 44
      src/main/java/com/bfd/app/controller/ManageController.java
  13. 172
      src/main/java/com/bfd/app/entity/Constants.java
  14. 137
      src/main/java/com/bfd/app/handler/MainHandler.java
  15. 133
      src/main/java/com/bfd/app/process/ClientPorcess.java
  16. 60
      src/main/java/com/bfd/app/process/CycleExecPorcess.java
  17. 89
      src/main/java/com/bfd/app/process/ManagePorcess.java
  18. 26
      src/main/java/com/bfd/app/service/ManageService.java
  19. 84
      src/main/java/com/bfd/app/service/impl/ManageServiceImpl.java
  20. 48
      src/main/java/com/bfd/app/utils/DataUtil.java
  21. 177
      src/main/java/com/bfd/app/utils/DateUtil.java
  22. 1060
      src/main/java/com/bfd/app/utils/DownLoadUtil.java
  23. 27
      src/main/java/com/bfd/app/utils/EncryptionUtil.java
  24. 36
      src/main/java/com/bfd/app/utils/FileUtil.java
  25. 53
      src/main/java/com/bfd/app/utils/GPTResultParseUtil.java
  26. 32
      src/main/java/com/bfd/app/utils/JsonUtil.java
  27. 33
      src/main/java/com/bfd/app/utils/OtherUtils.java
  28. 92
      src/main/java/com/bfd/app/utils/PauseTool.java
  29. 18
      src/main/java/com/bfd/app/utils/QueueUtil.java
  30. 46
      src/main/java/com/bfd/app/utils/SpringBootKafka.java
  31. 23
      src/main/java/com/bfd/app/utils/ThrowMessageUtil.java
  32. 97
      src/main/resources/application.yml
  33. 36
      src/main/resources/logback-spring.xml
  34. 20
      src/test/java/com/bfd/AppTest.java

40
.classpath

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

2
.gitignore

@ -0,0 +1,2 @@
/logs/
/target/

23
.project

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>api_manager</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

5
.settings/org.eclipse.core.resources.prefs

@ -0,0 +1,5 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8

9
.settings/org.eclipse.jdt.core.prefs

@ -0,0 +1,9 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.methodParameters=generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8

4
.settings/org.eclipse.m2e.core.prefs

@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

1
README.md

@ -0,0 +1 @@
api应用

282
pom.xml

@ -0,0 +1,282 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
</parent>
<groupId>com.bfd</groupId>
<artifactId>api_manager</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>api_manager</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/de.codecentric/spring-boot-admin-starter-client -->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-test -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.10.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version> <!-- 根据你的需求选择版本 -->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.mchange/c3p0 -->
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-reflect -->
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
<version>1.6.21</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jsoup/jsoup -->
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.pdfbox</groupId>
<artifactId>pdfbox</artifactId>
<version>2.0.28</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-scratchpad</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>4.0.1</version>
</dependency>
<!-- Log4j 2 日志库 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.14.1</version> <!-- 或者你需要的其他版本 -->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.1</version> <!-- 或者你需要的其他版本 -->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<!--redis -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>
<build>
<!-- <pluginManagement> --><!-- lock down plugins versions to avoid using Maven defaults (may be moved
to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- spring-boot-maven-plugin插件就是打包spring boot应用的 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.bfd.app.Application</mainClass>
<layout>ZIP</layout>
<includes>
<include>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<type>jar</type>
<includeTypes>jar</includeTypes>
<includeScope>runtime</includeScope>
<outputDirectory>${project.build.directory}/libs</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!-- </pluginManagement> -->
</build>
</project>

24
src/main/java/com/bfd/app/Application.java

@ -0,0 +1,24 @@
package com.bfd.app;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 主入口
*
* @author jian.mao
* @date 2023年7月4日
* @description
*/
@SpringBootApplication
@EnableKafka
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

39
src/main/java/com/bfd/app/cache/ConfigCache.java

@ -0,0 +1,39 @@
package com.bfd.app.cache;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author jian.mao
* @date 2022年11月11日
* @description 静态变量类
*/
@Slf4j
public class ConfigCache {
/**启动条件**/
public static boolean isStart = true;
/*****任务队列*****/
public static LinkedBlockingDeque<Map<String, Object>> taskQueue = new LinkedBlockingDeque<Map<String,Object>>();
public static LinkedBlockingDeque<Map<String, Object>> taskClientQueue = new LinkedBlockingDeque<Map<String,Object>>();
public static LinkedBlockingDeque<Map<String, Object>> cycleTaskQueue = new LinkedBlockingDeque<Map<String,Object>>();
/**
* 队列录入任务
* @param queue
* @param task
*/
public static void putQueue(LinkedBlockingDeque<Map<String, Object>> queue,Map<String, Object> task){
//next app 写入队列准备调出
try {
queue.put(task);
} catch (InterruptedException e) {
log.error("队列写入data失败---");
}
}
}

65
src/main/java/com/bfd/app/cache/LocalCache.java

@ -0,0 +1,65 @@
package com.bfd.app.cache;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* token 缓存
* @author jian.mao
* @date 2023年11月8日
* @description
*/
@Slf4j
public class LocalCache {
private static final ConcurrentMap<String, Value> CACHE = new ConcurrentHashMap<>(64);
private LocalCache() {
}
public static LocalCache getInstance() {
return Inner.LOCAL_CACHE;
}
public static String get(String key) {
Value v = LocalCache.CACHE.get(key);
if (v == null || new Date().after(v.end)) {
return "";
}
log.debug("get key:{},time left:{}s", key, (v.end.getTime() - new Date().getTime()) / 1000);
return v.value;
}
public static void set(String key, String value, int expire, TimeUnit timeUnit) {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, (int) timeUnit.toSeconds(expire));
Value v = new Value(value, calendar.getTime());
log.debug("put key:{}, expire time:{} ", key, calendar.getTime());
LocalCache.CACHE.put(key, v);
}
private static class Value {
String value;
Date end;
public Value(String value, Date time) {
this.value = value;
this.end = time;
}
}
private static class Inner {
private static final LocalCache LOCAL_CACHE = new LocalCache();
}
}

44
src/main/java/com/bfd/app/controller/ManageController.java

@ -0,0 +1,44 @@
package com.bfd.app.controller;
import javax.annotation.Resource;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.bfd.app.service.ManageService;
/**
* @author jian.mao
* @date 2023年11月8日
* @description
*/
@Controller
@RequestMapping("/manage")
public class ManageController {
@Resource
private ManageService manageService;
@PostMapping("/server")
@ResponseBody
public String server(@RequestBody String dataJson){
String response = manageService.server(dataJson);
return response;
}
@PostMapping("/client")
@ResponseBody
public String client(@RequestBody String dataJson){
String response = manageService.client(dataJson);
return response;
}
@RequestMapping(value = "/hello", method = RequestMethod.GET)
@ResponseBody
public String hello(String param, String token) {
return "123";
}
}

172
src/main/java/com/bfd/app/entity/Constants.java

@ -0,0 +1,172 @@
package com.bfd.app.entity;
/**
* 常量实体类
* @author jian.mao
* @date 2022年11月15日
* @description
*/
public class Constants {
/*************************蓝图常量key名称*********************************/
public final static String SCHEDULING = "scheduling";
public final static String TYPE = "type";
public final static String INTERVAL = "interval";
public final static String CREATED = "created";
public final static String LAST_EDIT = "last_edit";
public final static String BLUEPRINT_ID = "blueprint_id";
public final static String BLUEPRINTID = "blueprintId";
public final static String BLUEPRINT_NAME = "name";
public final static String SCENARIO = "scenario";
public final static String AUTOCOMMITTRIGGERLAST = "autoCommitTriggerLast";
public final static String FRESHVARIABLES = "freshVariables";
public final static String AUTOCOMMIT = "autoCommit";
public final static String MAXERRORS = "maxErrors";
public final static String DATALOSS = "dataloss";
public final static String POSITION = "position";
public final static String SCENES_ID = "scenes_id";
public final static String SCENESID = "scenesId";
public final static String MULTI_BRANCH = "multi_branch";
public final static String SINGLE = "single";
/**已重试次数**/
public final static String ERROR_TIME = "error_time";
public final static String PREVIOUS_RESULT = "previous_result";
/****数据id*****/
public final static String BUSINESSKEY = "businessKey";
/*************************metadata常量key名称*********************************/
public final static String LABEL_COL = "label_col";
public final static String LABEL = "label";
public final static String USER = "user";
public final static String ADMIN = "admin";
public final static String ADDRESS = "address";
public final static String DATASOURCE = "datasource";
public final static String INDEX = "index";
/*************************app常量key名称*********************************/
public final static String APPS = "apps";
public final static String TRANSFER_ID = "transfer_id";
public final static String MODULE = "module";
public final static String VERSION = "version";
public final static String METADATA = "metadata";
public final static String APP_NAME = "name";
public final static String DESCRIBE = "describe";
public final static String NEXT_APP_ID = "next_app_id";
public final static String EDGE_ID = "edge_id";
public final static String START_ID = "start_id";
public final static String END_ID = "end_id";
public final static String WAIT_CONDITION = "wait_condition";
public final static String START_TAG = "start_tag";
/*************************module类型*********************************/
public final static String FILE = "file";
public final static String OCR = "OCR";
public final static String FILTER = "Filter";
public final static String CHATGPT = "ChatGPT";
public final static String MYSQL = "mysql";
/*************************other类型*********************************/
public final static String UNDERLINE = "_";
public final static String RESULT_TOPIC = null;
public static final String EMPTY = "";
public static final String HTTP = "http";
public static final String REQUEST_ERROR_MESSAGE = "Download failed error is";
public static final String REQUEST_RESULT = "result";
public static final String REQUEST_RESULT_RESULTS = "results";
public static final String MAP_TYPE = "Map";
public static final String LIST_TYPE = "List";
public static final String STRING_TYPE = "String";
public static final String DOCUMENT_TYPE = "doc";
public static final String FILTER_ZH = "过滤器";
public static final String JSON_SELE_SYMBOL = "$.";
public static final String LEFT_BRACKETS = "[";
public static final String RIGTH_BRACKETS = "]";
public static final String TASKTYPE = "taskType";
public static final Integer USER_TYPE = 1;
public static final Integer KEYWORD_TYPE = 0;
public static final Integer DETAIL_TYPE = 2;
public static final String CID = "cid";
public static final String SITETYPE = "siteType";
public static final Integer DEFULT_SUBJECTID = 304864;
public static final Integer DEFULT_CRAWLCYCLICITYTIME = 1440;
public static final String CRAWLENDTIME = "crawlEndTime";
public static final String CRAWLSTARTTIME = "crawlStartTime";
public static final String CRAWLPAGETYPES = "crawlPageTypes";
public static final String APPID = "113ic";
public static final String APP_ID = "appId";
public final static String ID = "id";
public static final Integer DEFULT_CRAWLPERIODHOUR = 24;
public static final String CREATEUSERID = "662015832180933762";
public static final String CRAWL_ADD_URL = "https://caiji.percent.cn/api/crawl/remote/task/save";
public static final String CRAWLKEYWORD = "crawlKeyword";
public static final String ATTACHTAG = "attachTag";
public static final String ATTACHTAG_VALUE = "analyze";
public static final String KEYWORD = "keyword";
public static final String SITEID = "siteId";
public static final String RESULTS = "results";
public static final String RESULT = "result";
public static final String CRAWLDATAFLAG = "crawlDataFlag";
public static final String CRAWLDATAFLAG_PREFIX = "\"crawlDataFlag\":\"keyword:";
public static final String TID = "tid";
public static final Long TIME_OUT = 1800000L;
public static final String ATTR = "attr";
public static final String HASVIDEO = "hasVideo";
public static final String CRAWL_END_MARK = "crawl_end_mark";
public static final String CRAWL_END_MESSAGE = "crawl_end_message";
public static final String CRAWL_END_MESSAGE_VALUE = "数据采集完成";
public static final String SUBJECTID = "subjectId";
public static final String TASKID = "taskId";
public static final String TASK_ID = "task_id";
public static final int SUCCESS_CODE = 200;
public static final String WEB_URL_SUFFIX = "/api/aogeo/api/cda/caiji/status";
public static final String STATUS = "status";
/************************redis*************************************/
public static final String LOCK_KEY = "myLock";
public static final long LOCK_EXPIRE_TIME = 300000;
/************************应用参数*************************************/
public static final String CODE = "code";
public static final String MESSAGES = "messages";
public static final String INPUT = "input";
public static final String OUTPUT = "output";
public static final String FORM = "form";
public static final String FIELD = "field";
public static final String VALUE = "value";
public static final String DATA = "data";
public static final String COLON_EN = ":";
/******************************admin*******************************/
public static final String AUTHORIZATION = "authorization";
public static final String TEMPERATURE = "temperature";
public static final String FIELDTYPE = "fieldType";
public static final String TOP_P = "top_p";
public static final String MODEL = "model";
public static final String PROMPT = "prompt";
public static final String DOUBAO_TURBO_URL = "https://ark.cn-beijing.volces.com/api/v3/chat/completions";
public static final String CHOICES = "choices";
public static final String CONTENT = "content";
public static final String MESSAGE = "message";
public static final String MASK_SENSITIVE_INFO = "mask_sensitive_info";
public static final String MAX_TOKENS = "max_tokens";
public static final int CHAT_TYPE_ONE = 1;
public static final String COMMITRESULT = "commitResult";
public static final String BASE_RESP = "base_resp";
public static final String STATUS_CODE = "status_code";
public static final String FILE_ID = "file_id";
public static final String TRACE = "trace";
public static final String METHOD = "method";
public static final String REQUESTURL = "requestUrl";
public static final String HEADERS = "headers";
public static final String BODY = "body";
public static final String BODYCONTENTTYPE = "bodyContentType";
public static final String GET = "get";
public static final String POST = "post";
public static final String CYCLE = "cycle";
public static final String UNIT = "cycle";
public static final String DURATION = "duration";
}

137
src/main/java/com/bfd/app/handler/MainHandler.java

@ -0,0 +1,137 @@
package com.bfd.app.handler;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.bfd.app.cache.ConfigCache;
import com.bfd.app.process.ClientPorcess;
import com.bfd.app.process.CycleExecPorcess;
import com.bfd.app.process.ManagePorcess;
import com.bfd.app.utils.FileUtil;
import com.bfd.app.utils.PauseTool;
/**
* 启动处理入口
*
* @author jian.mao
* @date 2023年11月3日
* @description
*/
@Component
@Order(value = 1)
@Slf4j
public class MainHandler implements ApplicationRunner {
@Value("${task.task-queue-path}")
private String taskPath;
@Value("${task.thread-num}")
private Integer threadNum;
@Resource
ManagePorcess managePorcess;
@Resource
ClientPorcess clientPorcess;
@Resource
CycleExecPorcess cycleExecPorcess;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Value("${zookeeper.connection-string}")
private String connectionString;
@Value("${zookeeper.publish-node}")
private String nodePath;
@Override
public void run(ApplicationArguments args) throws Exception {
PauseTool pauseTool = new PauseTool();
pauseTool.initializeRedisCache(stringRedisTemplate);
pauseTool.setupZookeeperListener(connectionString, nodePath);
for (Integer i = 0; i < threadNum; i++) {
//开启问答处理线程
new Thread(managePorcess).start();
log.info("开启api服务端处理线程-----");
//客户端现成
new Thread(clientPorcess).start();
log.info("开启api客户端处理线程-----");
}
//停止处理
waitDown();
//启动加载缓存任务
readTask(taskPath, ConfigCache.taskQueue);
}
@SuppressWarnings("unchecked")
public static void readTask(String path, LinkedBlockingDeque<Map<String, Object>> queue) {
File file = new File(path);
if (file.exists()) {
List<String> tasks = null;
try {
tasks = FileUtils.readLines(file, "UTF-8");
} catch (IOException e) {
e.printStackTrace();
}
for (String taskStr : tasks) {
Map<String, Object> task = JSONObject.parseObject(taskStr);
try {
queue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
file.delete();
}
}
/**
* 结束触发钩子
*/
public void waitDown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// 停止线程
ConfigCache.isStart = false;
log.info("stop-------");
writeTsskToFile();
}
});
}
/**
* 任务持久化到硬盘
*/
public void writeTsskToFile() {
while (true) {
if (ConfigCache.taskQueue.size() > 0) {
try {
Map<String, Object> task = ConfigCache.taskQueue.take();
FileUtil.writeFile(taskPath, JSONObject.toJSONString(task));
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
log.info("taskQueue write is file end");
break;
}
}
}
}

133
src/main/java/com/bfd/app/process/ClientPorcess.java

@ -0,0 +1,133 @@
package com.bfd.app.process;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.bfd.app.cache.ConfigCache;
import com.bfd.app.entity.Constants;
import com.bfd.app.utils.DownLoadUtil;
import com.bfd.app.utils.PauseTool;
import com.bfd.app.utils.SpringBootKafka;
/**
* @author jian.mao
* @date 2023年11月8日
* @description
*/
@Component
@Slf4j
public class ClientPorcess implements Runnable {
@Autowired
private SpringBootKafka springBootKafka;
@Value("${customize-kafka.producer.topic}")
private String topic;
@Override
public void run() {
while (ConfigCache.isStart) {
Map<String, Object> task = null;
try {
task = ConfigCache.taskClientQueue.take();
log.info("客户端-任务队列长度:{}",ConfigCache.taskClientQueue.size());
log.info("客户端-任务:{}",JSONObject.toJSONString(task));
//输入配置
Map<String, Object> input = (Map<String, Object>) task.get(Constants.INPUT);
//输出配置
Map<String, Object> output = (Map<String, Object>) task.get(Constants.OUTPUT);
int scenesId = (int) task.get(Constants.SCENES_ID);
int version = (int) task.get(Constants.VERSION);
String pauseKey = scenesId + "_" + version;
if (!PauseTool.CACHE.containsKey(pauseKey)) {
log.info("流程:{}的版本:{}已失效,任务跳过", scenesId, version);
continue;
}
String method = (String) input.get(Constants.METHOD);
String requestUrl = (String) input.get(Constants.REQUESTURL);
List<Map<String, Object>> headers = (List<Map<String, Object>>) input.get(Constants.HEADERS);
List<Map<String, Object>> body = (List<Map<String, Object>>) input.get(Constants.BODY);
String bodyContentType = (String) input.get(Constants.BODYCONTENTTYPE);
Map<String, Object> header = new HashMap<String, Object>(16);
for (Map<String, Object> map : headers) {
header.put(map.get("key").toString(), map.get("value"));
}
Map<String, Object> param = new HashMap<String, Object>(16);
for (Map<String, Object> map : body) {
param.put(map.get("key").toString(), map.get("value"));
}
log.info("请求信息----url:{},param:{},header:{}",requestUrl, JSONObject.toJSONString(param), JSONObject.toJSONString(header));
String responseStr = null;
if(method.equalsIgnoreCase(Constants.GET)){
//get请求
responseStr = DownLoadUtil.doGet(requestUrl, header);
}else{
//post请求
if("1".equals(bodyContentType)){
//表单提交
responseStr = DownLoadUtil.doPostFrom(requestUrl, param, header);
}else{
header.put("Content-Type", "application/json");
//json提交
responseStr = DownLoadUtil.doPost(requestUrl, JSONObject.toJSONString(param), header);
}
}
//结果接收
Map<String, Object> result = new HashMap<String, Object>(16);
Map<String, Object> results = new HashMap<String, Object>(16);
results.put(Constants.CONTENT, responseStr);
results.put(Constants.ID, UUID.randomUUID().toString());
results.put("isLast", 1);
result.put(Constants.RESULTS, JSONObject.toJSONString(results));
result.put(Constants.MESSAGE, "成功");
result.put(Constants.STATUS, 1);
task.put(Constants.RESULT, result);
//发送kafka
springBootKafka.send(topic, JSONObject.toJSONString(task));
log.info("数据流转至下游-------");
//周期任务
Map<String, Object> cycle = (Map<String, Object>) input.get(Constants.CYCLE);
String unit = (String) cycle.get(Constants.UNIT);
int duration = (int) cycle.get(Constants.DURATION);
long nextdischTime = System.currentTimeMillis();
if("1".equals(unit)){
nextdischTime += duration*1000;
}else if("2".equals(unit)){
nextdischTime += duration*1000*60;
}else{
nextdischTime += duration*1000*60*60;
}
task.put("nextdischTime", nextdischTime);
ConfigCache.cycleTaskQueue.put(task);
} catch (Throwable e) {
log.error("api服务端处理异常,", e);
//结果集
Map<String, Object> results = new HashMap<String, Object>(16);
Map<String, Object> result = new HashMap<String, Object>(16);
//遍历入库返回结果拼接响应内容
results.put("isLast", 1);
results.put("content", e.getMessage());
result.put(Constants.RESULTS, JSONObject.toJSONString(results));
result.put(Constants.MESSAGE, "异常");
result.put(Constants.STATUS, 2);
task.put(Constants.RESULT, result);
//发送kafka
springBootKafka.send(topic, JSONObject.toJSONString(task));
log.info("数据流转至下游-------");
}
}
}
}

60
src/main/java/com/bfd/app/process/CycleExecPorcess.java

@ -0,0 +1,60 @@
package com.bfd.app.process;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.bfd.app.cache.ConfigCache;
import com.bfd.app.entity.Constants;
import com.bfd.app.utils.DownLoadUtil;
import com.bfd.app.utils.PauseTool;
import com.bfd.app.utils.SpringBootKafka;
/**
* @author jian.mao
* @date 2023年11月8日
* @description
*/
@Component
@Slf4j
public class CycleExecPorcess implements Runnable {
@Autowired
private SpringBootKafka springBootKafka;
@Value("${customize-kafka.producer.topic}")
private String topic;
@Override
public void run() {
while (ConfigCache.isStart) {
Map<String, Object> task = null;
try {
task = ConfigCache.cycleTaskQueue.take();
log.info("周期-任务队列长度:{}",ConfigCache.cycleTaskQueue.size());
log.info("周期-任务:{}",JSONObject.toJSONString(task));
long nextdischTime = (long) task.get("nextdischTime");
if(nextdischTime < System.currentTimeMillis()){
ConfigCache.taskClientQueue.put(task);
}else{
ConfigCache.cycleTaskQueue.put(task);
}
Thread.sleep(1000*2);
} catch (Throwable e) {
log.error("周期任务处理异常,", e);
}
}
}
}

89
src/main/java/com/bfd/app/process/ManagePorcess.java

@ -0,0 +1,89 @@
package com.bfd.app.process;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.bfd.app.cache.ConfigCache;
import com.bfd.app.entity.Constants;
import com.bfd.app.utils.PauseTool;
import com.bfd.app.utils.SpringBootKafka;
/**
* @author jian.mao
* @date 2023年11月8日
* @description
*/
@Component
@Slf4j
public class ManagePorcess implements Runnable {
@Autowired
private SpringBootKafka springBootKafka;
@Value("${customize-kafka.producer.topic}")
private String topic;
@Override
public void run() {
while (ConfigCache.isStart) {
Map<String, Object> task = null;
try {
task = ConfigCache.taskQueue.take();
log.info("任务队列长度:{}",ConfigCache.taskQueue.size());
log.info("任务:{}",JSONObject.toJSONString(task));
//输入配置
Map<String, Object> input = (Map<String, Object>) task.get(Constants.INPUT);
//输出配置
Map<String, Object> output = (Map<String, Object>) task.get(Constants.OUTPUT);
int scenesId = (int) task.get(Constants.SCENES_ID);
int version = (int) task.get(Constants.VERSION);
String pauseKey = scenesId + "_" + version;
if (!PauseTool.CACHE.containsKey(pauseKey)) {
log.info("流程:{}的版本:{}已失效,任务跳过", scenesId, version);
continue;
}
//结果接收
Map<String, Object> result = new HashMap<String, Object>(16);
Map<String, Object> results = new HashMap<String, Object>(16);
Map<String, Object> apiResult = (Map<String, Object>) task.get(Constants.RESULT);
for (String key:output.keySet()) {
if(apiResult.containsKey(key)){
results.put(key, apiResult.get(key));
}
}
results.put("isLast", 1);
result.put(Constants.RESULTS, JSONObject.toJSONString(results));
result.put(Constants.MESSAGE, "成功");
result.put(Constants.STATUS, 1);
task.put(Constants.RESULT, result);
//发送kafka
springBootKafka.send(topic, JSONObject.toJSONString(task));
log.info("数据流转至下游-------");
} catch (Throwable e) {
log.error("api服务端处理异常,", e);
//结果集
Map<String, Object> results = new HashMap<String, Object>(16);
Map<String, Object> result = new HashMap<String, Object>(16);
//遍历入库返回结果拼接响应内容
results.put("isLast", 1);
results.put("content", e.getMessage());
result.put(Constants.RESULTS, JSONObject.toJSONString(results));
result.put(Constants.MESSAGE, "异常");
result.put(Constants.STATUS, 2);
task.put(Constants.RESULT, result);
//发送kafka
springBootKafka.send(topic, JSONObject.toJSONString(task));
log.info("数据流转至下游-------");
}
}
}
}

26
src/main/java/com/bfd/app/service/ManageService.java

@ -0,0 +1,26 @@
package com.bfd.app.service;
/**
* api接口控制
* @author jian.mao
* @date 2023年11月8日
* @description
*/
public interface ManageService {
/**
* 服务端
* @param dataJson 接口参数
* @return
*/
public String server(String dataJson);
/**
* 客户端
* @param dataJson
* @return
*/
public String client(String dataJson);
}

84
src/main/java/com/bfd/app/service/impl/ManageServiceImpl.java

@ -0,0 +1,84 @@
package com.bfd.app.service.impl;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.bfd.app.cache.ConfigCache;
import com.bfd.app.entity.Constants;
import com.bfd.app.service.ManageService;
/**
* @author jian.mao
* @date 2024年11月4日
* @description
*/
@Service
@Slf4j
public class ManageServiceImpl implements ManageService{
@Override
public String server(String dataJson) {
Map<String, Object> response = new HashMap<>(16);
int code = 200;
String message = "success";
Map<String, Object> task = null;
try {
task = JSONObject.parseObject(dataJson);
} catch (Exception e) {
log.error("参数结构不合法,", e);
code = 100010;
message = "参数不合法";
}
// 写入队列
try {
if(task.containsKey(Constants.TRACE) && (boolean)task.get(Constants.TRACE)){
ConfigCache.taskQueue.putFirst(task);
}else{
ConfigCache.taskQueue.put(task);
}
} catch (InterruptedException e) {
log.error("任务写入等待队列异常,", e);
code = 100011;
message = "任务写入等待队列失败";
}
response.put(Constants.CODE, code);
response.put(Constants.MESSAGE, message);
return JSONObject.toJSONString(response);
}
@Override
public String client(String dataJson) {
Map<String, Object> response = new HashMap<>(16);
int code = 200;
String message = "success";
Map<String, Object> task = null;
try {
task = JSONObject.parseObject(dataJson);
} catch (Exception e) {
log.error("参数结构不合法,", e);
code = 100010;
message = "参数不合法";
}
// 写入队列
try {
if(task.containsKey(Constants.TRACE) && (boolean)task.get(Constants.TRACE)){
ConfigCache.taskClientQueue.putFirst(task);
}else{
ConfigCache.taskClientQueue.put(task);
}
} catch (InterruptedException e) {
log.error("任务写入等待队列异常,", e);
code = 100011;
message = "任务写入等待队列失败";
}
response.put(Constants.CODE, code);
response.put(Constants.MESSAGE, message);
return JSONObject.toJSONString(response);
}
}

48
src/main/java/com/bfd/app/utils/DataUtil.java

@ -0,0 +1,48 @@
package com.bfd.app.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import java.util.Map;
/**
* @author:jinming
* @className:DataUtil
* @version:1.0
* @description: 获取dataValue的值
* @Date:2023/11/1 9:54
*/
public class DataUtil {
/**
*
* @param key 传入的key
* @param dataMap 数据map
* @return 根据传入的参数进行判断解析返回正确的dataValue
*/
public static Object getValue(String key, Map dataMap) {
Object dataValue;
String isJson = "#json#";
if (key.contains(isJson)) {
//进行第一次拆分获取#json#前面的部分
String[] keySplit = key.split(isJson);
String firstDataKey = keySplit[0];
String[] firstDataKeySplit = firstDataKey.split(":");
//取出前半部分对应的JSON数据并转换为JSONObject
String dataJson = (String) dataMap.get(firstDataKeySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
//根据key的后半部分取出对应JSONObject中的值
String firstDataKeyJson = (String) JSONPath.eval(dataJsonObject, firstDataKeySplit[1]);
String secDataKey = keySplit[1];
JSONObject firstDataJsonObject = JSON.parseObject(firstDataKeyJson);
dataValue = JSONPath.eval(firstDataJsonObject, secDataKey);
return dataValue;
}
String[] keySplit = key.split(":");
String jsonPath = keySplit[1];
String dataJson = (String) dataMap.get(keySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
dataValue = JSONPath.eval(dataJsonObject, jsonPath);
return dataValue;
}
}

177
src/main/java/com/bfd/app/utils/DateUtil.java

@ -0,0 +1,177 @@
package com.bfd.app.utils;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
/**
* 日期工具类
*
* @author jian.mao
* @date 2022年11月15日
* @description
*/
@Slf4j
public class DateUtil {
/**
* @return
*/
public static String getTimeStrForNow() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");
return sdf.format(new Date());
}
public static String getTimeStrForDay(long time) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
return sdf.format(new Date(time * 1000));
}
public static String getTimeStrForDay() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
return sdf.format(new Date());
}
public static String getDateTime() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = sdf.format(new Date());
return time;
}
public static String getDateTime(Long timestap) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = sdf.format(new Date(timestap));
return time;
}
public static String getDate(Long timestap) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String time = sdf.format(new Date(timestap));
return time;
}
public static String getDateTimeForMonth() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMM");
String time = sdf.format(new Date());
return time;
}
/**
* 休眠
*
* @param millis 毫秒
*/
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 1. @Description:时间戳转时间
* 2. @Author: ying.zhao
* 3. @Date: 2023/3/28
*/
public static String timestampToDate(String time) {
int thirteen = 13;
int ten = 10;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// if (time.length() == thirteen) {
if (time.length() > ten) {
return sdf.format(new Date(Long.parseLong(time)));
} else {
return sdf.format(new Date(Integer.parseInt(time) * 1000L));
}
}
public static String parseCreated(String jsonTime){
String formattedDateTime = getDateTime();
try {
// 使用fastjson解析JSON数据
JSONObject jsonObject = JSON.parseObject(jsonTime);
// 获取日期和时间的值
JSONObject dateObject = jsonObject.getJSONObject("date");
int day = dateObject.getIntValue("day");
int month = dateObject.getIntValue("month");
int year = dateObject.getIntValue("year");
JSONObject timeObject = jsonObject.getJSONObject("time");
int hour = timeObject.getIntValue("hour");
int minute = timeObject.getIntValue("minute");
int second = timeObject.getIntValue("second");
// 创建LocalDateTime对象
LocalDateTime dateTime = LocalDateTime.of(year, month, day, hour, minute, second);
// 定义日期时间格式化器
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 格式化日期时间
formattedDateTime = dateTime.format(formatter);
} catch (Exception e) {
log.info("日期转换失败:{}",e);
}
return formattedDateTime;
}
/**
* 字符串转换日期
* @param format
* @param date
* @return
*/
public static Date strToDate(String format,String date){
SimpleDateFormat sdf = new SimpleDateFormat(format);
if (date == null || date.equals("")){
return new Date();
}else{
Date ru = null;
try {
ru = sdf.parse(date);
} catch (ParseException e) {
e.printStackTrace();
}
return ru;
}
}
/**
* 日期格式话
* @param format 日期格式
* @param dater 要转换的日期,默认当前时间
* @return
*/
public static String FormatDate(String format,Date date){
String fromatDate = null;
SimpleDateFormat sdf = new SimpleDateFormat(format);
if (date == null){
fromatDate = sdf.format(new Date());
}else{
fromatDate = sdf.format(date);
}
return fromatDate;
}
public static void main(String[] args) {
String time = timestampToDate("955814400000");
System.out.println(time);
}
}

1060
src/main/java/com/bfd/app/utils/DownLoadUtil.java
File diff suppressed because it is too large
View File

27
src/main/java/com/bfd/app/utils/EncryptionUtil.java

@ -0,0 +1,27 @@
package com.bfd.app.utils;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* @author jian.mao
* @date 2023年3月10日
* @description
*/
public class EncryptionUtil {
public static String md5(String text) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(text.getBytes());
byte[] bytes = md.digest();
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b & 0xff));
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return null;
}
}
}

36
src/main/java/com/bfd/app/utils/FileUtil.java

@ -0,0 +1,36 @@
package com.bfd.app.utils;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 文件工具类
* @author jian.mao
* @date 2023年7月14日
* @description
*/
public class FileUtil {
/**
* 数据写入文件
* @param Path 文件路径
* @param result 数据
* @throws IOException
*/
public static void writeFile(String path,String result){
try {
FileWriter fw = new FileWriter(path,true);
fw.write(result+"\n");
fw.flush();
fw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

53
src/main/java/com/bfd/app/utils/GPTResultParseUtil.java

@ -0,0 +1,53 @@
package com.bfd.app.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author:jinming
* @className:GPTResultParseUtil
* @version:1.0
* @description:
* @Date:2024/6/28 10:11
*/
public class GPTResultParseUtil {
public static Map<String, Object> parseGPTResult(Map<String, Object> output, String gptContent) {
Map<String, Object> jsonResult = new HashMap<>();
try {
// 替换```json, ``` \n
String jsonContent = gptContent.replace("```json", "").replace("```", "").replace("\n", "");
JSONObject jsonGPT = JSON.parseObject(jsonContent);
for (String key : output.keySet()) {
if (jsonGPT.containsKey(key)) {
jsonResult.put(key, jsonGPT.get(key));
}
}
return jsonResult;
} catch (JSONException e) {
try {
// 直接解析失败使用正则表达式匹配外层的 {}
Pattern pattern = Pattern.compile("\\{.*\\}", Pattern.DOTALL);
Matcher matcher = pattern.matcher(gptContent.replace("\n", ""));
if (matcher.find()) {
JSONObject jsonGPT = JSON.parseObject(matcher.group());
for (String key : output.keySet()) {
if (jsonGPT.containsKey(key)) {
jsonResult.put(key, jsonGPT.get(key));
}
}
return jsonResult;
} else {
return null;
}
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
}
}

32
src/main/java/com/bfd/app/utils/JsonUtil.java

@ -0,0 +1,32 @@
package com.bfd.app.utils;
import com.alibaba.fastjson.JSONObject;
import com.bfd.app.entity.Constants;
/**
* json工具
* @author jian.mao
* @date 2023年7月10日
* @description
*/
public class JsonUtil {
/**
* 校验字符串是list/map/str
* @param jsonString
* @return
*/
public static String checkJsonType(String jsonString) {
try {
JSONObject.parseObject(jsonString);
return Constants.MAP_TYPE;
} catch (Exception e) {
try {
JSONObject.parseArray(jsonString);
return Constants.LIST_TYPE;
} catch (Exception ex) {
return Constants.STRING_TYPE;
}
}
}
}

33
src/main/java/com/bfd/app/utils/OtherUtils.java

@ -0,0 +1,33 @@
package com.bfd.app.utils;
import java.security.MessageDigest;
/**
* 其他工具类
* @author jian.mao
* @date 2023年9月19日
* @description
*/
public class OtherUtils {
public static String getMd5(String string) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] bs = md5.digest(string.getBytes("UTF-8"));
StringBuilder sb = new StringBuilder(40);
for (byte x : bs) {
if ((x & 0xff) >> 4 == 0) {
sb.append("0").append(Integer.toHexString(x & 0xff));
} else {
sb.append(Integer.toHexString(x & 0xff));
}
}
return sb.toString();
} catch (Exception e) {
return "nceaform" + System.currentTimeMillis();
}
}
}

92
src/main/java/com/bfd/app/utils/PauseTool.java

@ -0,0 +1,92 @@
package com.bfd.app.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author:jinming
* @className:ZookeeperNodeMonitor
* @version:1.0
* @description: Zookeeper节点监听和Redis初始化工具类
* @Date:2024/7/2 14:20
*/
@Component
@Slf4j
public class PauseTool {
// 本地缓存
public static final HashMap<String, String> CACHE = new HashMap<>();
/**
* 初始化Redis中的version_*键并加载到本地缓存
*/
public void initializeRedisCache(StringRedisTemplate stringRedisTemplate) {
try {
Set<String> keys = stringRedisTemplate.keys("version_*");
if (keys != null) {
for (String key : keys) {
String value = stringRedisTemplate.opsForValue().get(key);
if (value != null) {
String sincesId = key.split("_")[1];
CACHE.put(sincesId.concat("_").concat(value), value);
}
}
}
log.info("当前缓存version信息:{}", JSON.toJSON(CACHE));
} catch (Exception e) {
log.error("Error initializing Redis cache", e);
}
}
public void setupZookeeperListener(String connectionString, String nodePath) {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
try {
// 创建节点监听器
NodeCache nodeCache = new NodeCache(curatorFramework, nodePath);
nodeCache.start();
log.info("数据监听已启动");
// 监听节点变化
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData();
try {
String nodeData = new String(data);
log.info("Node data changed: " + nodeData);
// 解析JSON数据
JSONObject jsonObject = JSON.parseObject(nodeData);
int scenesId = jsonObject.getIntValue("scenes_id");
int version = jsonObject.getIntValue("version");
String newKey = scenesId + "_" + version;
// 移除CACHE中所有以scenesId开头的key
CACHE.keySet().removeIf(key -> key.startsWith(scenesId + "_"));
// 将新的key放入CACHE
CACHE.put(newKey, String.valueOf(version));
log.info("当前缓存version信息:{}", JSON.toJSON(CACHE));
} catch (Exception e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
log.error("Error setting up Zookeeper listener", e);
}
}
}

18
src/main/java/com/bfd/app/utils/QueueUtil.java

@ -0,0 +1,18 @@
package com.bfd.app.utils;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author:jinming
* @className:QueueUtil
* @version:1.0
* @description:
* @Date:2023/7/13 15:00
*/
public class QueueUtil {
public static LinkedBlockingDeque<Map<String, Object>> taskQueue = new LinkedBlockingDeque<Map<String, Object>>();
public static LinkedBlockingDeque<String> sendQueue = new LinkedBlockingDeque<String>();
}

46
src/main/java/com/bfd/app/utils/SpringBootKafka.java

@ -0,0 +1,46 @@
package com.bfd.app.utils;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @PROJECT_NAME: companybusinesscrawl
* @DESCRIPTION:SpringBootKafka 工具类
* @AUTHOR: ying.zhao
* @DATE: 2023/4/6 11:09
*/
@Slf4j
@Component
public class SpringBootKafka {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 自定义topicKafkaTemplate
*/
/**
* public static final String TOPIC = "companyBussTest";
**/
public void send(String topic, String message) {
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的处理
log.info(topic + " - 生产者 发送消息成功" );
}
});
}
}

23
src/main/java/com/bfd/app/utils/ThrowMessageUtil.java

@ -0,0 +1,23 @@
package com.bfd.app.utils;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
* @author jian.mao
* @date 2023年3月22日
* @description
*/
public class ThrowMessageUtil {
/**
* 获取异常信息
* @param t
* @return
*/
public static String getErrmessage(Throwable t){
StringWriter stringWriter=new StringWriter();
t.printStackTrace(new PrintWriter(stringWriter,true));
return stringWriter.getBuffer().toString();
}
}

97
src/main/resources/application.yml

@ -0,0 +1,97 @@
logging:
level:
root: info
path: ../logs
server:
port: 8023
servlet:
context-path: /api
tomcat:
uri-encoding: utf-8
max-connections: 20000
max-http-form-post-size: 1
max-threads: 1000
spring:
application:
name: apiManager
kafka:
bootstrap-servers: 172.16.12.55:9092,172.16.12.56:9092,172.16.12.57:9092
producer:
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
max-request-size: 52428800 # 设置为 50 MB
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: true
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#消费组
group-id: test4
#消费者并发线程数
concurrency: 4
#超时时间
max-poll-interval-ms: 60000
#listener:
# 在侦听器容器中运行的线程数。
#concurrency: 5
#listner负责ack,每调用一次,就立即commit
#ack-mode: manual_immediate
#missing-topics-fatal: false
boot:
admin:
client:
url: http://172.16.12.55:8001
instance:
service-base-url: http://172.16.12.56:8023
redis:
host: 172.24.12.126
port: 6379
timeout: 10000
database: 5
jedis:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
max-wait: 800 # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 8 # 连接池中的最大空闲连接
min-idle: 2 # 连接池中的最小空闲连接
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
health:
elasticsearch:
enabled: false
customize-kafka:
producer:
topic: analyze
task:
task-queue-path: ../data/taskQueue.txt
thread-num: 1
zookeeper:
connection-string: 172.16.12.55:2181,172.16.12.56:2181,172.16.12.57:2181
publish-node: /analyze

36
src/main/resources/logback-spring.xml

@ -0,0 +1,36 @@
<configuration>
<!-- 属性文件:在properties文件中找到对应的配置项 -->
<springProperty scope="context" name="logging.path" source="logging.path"/>
<springProperty scope="context" name="logging.level" source="logging.level.com.bfd"/>
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 -->
<!-- <appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</Pattern>
</encoder>
</appender> -->
<appender name="GLMAPPER-LOGGERONE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${logging.level}</level>
</filter>
<file>
${logging.path}/api_managerInfo.log
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logging.path}/api_managerInfo.log.%d{yyyy-MM-dd}</FileNamePattern>
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="GLMAPPER-LOGGERONE"/>
<!-- <appender-ref ref="STDOUT"/> -->
</root>
</configuration>

20
src/test/java/com/bfd/AppTest.java

@ -0,0 +1,20 @@
package com.bfd;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
/**
* Unit test for simple App.
*/
public class AppTest
{
/**
* Rigorous Test :-)
*/
@Test
public void shouldAnswerWithTrue()
{
assertTrue( true );
}
}
Loading…
Cancel
Save