commit
56309d44e0
21 changed files with 1390 additions and 0 deletions
-
40.classpath
-
2.gitignore
-
23.project
-
4.settings/org.eclipse.core.resources.prefs
-
9.settings/org.eclipse.jdt.core.prefs
-
4.settings/org.eclipse.m2e.core.prefs
-
1README.md
-
173pom.xml
-
23src/main/java/com/bfd/dataprocess/DataProcessApplication.java
-
32src/main/java/com/bfd/dataprocess/config/Constant.java
-
64src/main/java/com/bfd/dataprocess/controller/ApiController.java
-
19src/main/java/com/bfd/dataprocess/service/ProcessService.java
-
543src/main/java/com/bfd/dataprocess/service/impl/ProcessServiceImpl.java
-
37src/main/java/com/bfd/dataprocess/util/AsyncConfig.java
-
65src/main/java/com/bfd/dataprocess/util/DataUtil.java
-
105src/main/java/com/bfd/dataprocess/util/DynamicDataSource.java
-
81src/main/java/com/bfd/dataprocess/util/KfkUtil.java
-
25src/main/java/com/bfd/dataprocess/util/ZookeeperConfig.java
-
65src/main/java/com/bfd/dataprocess/util/ZookeeperNodeMonitor.java
-
37src/main/resources/application.yml
-
38src/main/resources/logback-spring.xml
@ -0,0 +1,40 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
<classpath> |
||||
|
<classpathentry kind="src" output="target/classes" path="src/main/java"> |
||||
|
<attributes> |
||||
|
<attribute name="optional" value="true"/> |
||||
|
<attribute name="maven.pomderived" value="true"/> |
||||
|
</attributes> |
||||
|
</classpathentry> |
||||
|
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"> |
||||
|
<attributes> |
||||
|
<attribute name="maven.pomderived" value="true"/> |
||||
|
<attribute name="optional" value="true"/> |
||||
|
</attributes> |
||||
|
</classpathentry> |
||||
|
<classpathentry kind="src" output="target/test-classes" path="src/test/java"> |
||||
|
<attributes> |
||||
|
<attribute name="optional" value="true"/> |
||||
|
<attribute name="maven.pomderived" value="true"/> |
||||
|
<attribute name="test" value="true"/> |
||||
|
</attributes> |
||||
|
</classpathentry> |
||||
|
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"> |
||||
|
<attributes> |
||||
|
<attribute name="maven.pomderived" value="true"/> |
||||
|
<attribute name="test" value="true"/> |
||||
|
<attribute name="optional" value="true"/> |
||||
|
</attributes> |
||||
|
</classpathentry> |
||||
|
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"> |
||||
|
<attributes> |
||||
|
<attribute name="maven.pomderived" value="true"/> |
||||
|
</attributes> |
||||
|
</classpathentry> |
||||
|
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"> |
||||
|
<attributes> |
||||
|
<attribute name="maven.pomderived" value="true"/> |
||||
|
</attributes> |
||||
|
</classpathentry> |
||||
|
<classpathentry kind="output" path="target/classes"/> |
||||
|
</classpath> |
@ -0,0 +1,2 @@ |
|||||
|
/target/ |
||||
|
/logs/ |
@ -0,0 +1,23 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
<projectDescription> |
||||
|
<name>mysql_app</name> |
||||
|
<comment></comment> |
||||
|
<projects> |
||||
|
</projects> |
||||
|
<buildSpec> |
||||
|
<buildCommand> |
||||
|
<name>org.eclipse.jdt.core.javabuilder</name> |
||||
|
<arguments> |
||||
|
</arguments> |
||||
|
</buildCommand> |
||||
|
<buildCommand> |
||||
|
<name>org.eclipse.m2e.core.maven2Builder</name> |
||||
|
<arguments> |
||||
|
</arguments> |
||||
|
</buildCommand> |
||||
|
</buildSpec> |
||||
|
<natures> |
||||
|
<nature>org.eclipse.jdt.core.javanature</nature> |
||||
|
<nature>org.eclipse.m2e.core.maven2Nature</nature> |
||||
|
</natures> |
||||
|
</projectDescription> |
@ -0,0 +1,4 @@ |
|||||
|
eclipse.preferences.version=1 |
||||
|
encoding//src/main/java=UTF-8 |
||||
|
encoding//src/main/resources=UTF-8 |
||||
|
encoding/<project>=UTF-8 |
@ -0,0 +1,9 @@ |
|||||
|
eclipse.preferences.version=1 |
||||
|
org.eclipse.jdt.core.compiler.codegen.methodParameters=generate |
||||
|
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 |
||||
|
org.eclipse.jdt.core.compiler.compliance=1.8 |
||||
|
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled |
||||
|
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning |
||||
|
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore |
||||
|
org.eclipse.jdt.core.compiler.release=disabled |
||||
|
org.eclipse.jdt.core.compiler.source=1.8 |
@ -0,0 +1,4 @@ |
|||||
|
activeProfiles= |
||||
|
eclipse.preferences.version=1 |
||||
|
resolveWorkspaceProjects=true |
||||
|
version=1 |
@ -0,0 +1 @@ |
|||||
|
mysql应用 java版本 |
@ -0,0 +1,173 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" |
||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
|
<modelVersion>4.0.0</modelVersion> |
||||
|
<parent> |
||||
|
<groupId>org.springframework.boot</groupId> |
||||
|
<artifactId>spring-boot-starter-parent</artifactId> |
||||
|
<version>2.2.4.RELEASE</version> |
||||
|
<relativePath/> <!-- lookup parent from repository --> |
||||
|
</parent> |
||||
|
<groupId>com.bfd</groupId> |
||||
|
<artifactId>data_process</artifactId> |
||||
|
<version>1.0-SNAPSHOT</version> |
||||
|
|
||||
|
<properties> |
||||
|
<java.version>1.8</java.version> |
||||
|
</properties> |
||||
|
<dependencies> |
||||
|
<dependency> |
||||
|
<groupId>org.springframework.boot</groupId> |
||||
|
<artifactId>spring-boot-starter-web</artifactId> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>mysql</groupId> |
||||
|
<artifactId>mysql-connector-java</artifactId> |
||||
|
<scope>runtime</scope> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>org.projectlombok</groupId> |
||||
|
<artifactId>lombok</artifactId> |
||||
|
<optional>true</optional> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>org.springframework.boot</groupId> |
||||
|
<artifactId>spring-boot-starter-test</artifactId> |
||||
|
<scope>test</scope> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>org.springframework</groupId> |
||||
|
<artifactId>spring-jdbc</artifactId> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>com.zaxxer</groupId> |
||||
|
<artifactId>HikariCP</artifactId> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>com.alibaba.fastjson2</groupId> |
||||
|
<artifactId>fastjson2</artifactId> |
||||
|
<version>2.0.12</version> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>cn.hutool</groupId> |
||||
|
<artifactId>hutool-all</artifactId> |
||||
|
<version>5.8.27</version> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>org.apache.kafka</groupId> |
||||
|
<artifactId>kafka-clients</artifactId> |
||||
|
<version>2.7.1</version> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>de.codecentric</groupId> |
||||
|
<artifactId>spring-boot-admin-client</artifactId> |
||||
|
<version>2.2.4</version> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>org.apache.curator</groupId> |
||||
|
<artifactId>curator-framework</artifactId> |
||||
|
<version>5.2.0</version> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>org.apache.curator</groupId> |
||||
|
<artifactId>curator-recipes</artifactId> |
||||
|
<version>5.2.0</version> |
||||
|
</dependency> |
||||
|
</dependencies> |
||||
|
<!-- <build>--> |
||||
|
<!-- <plugins>--> |
||||
|
<!-- <plugin>--> |
||||
|
<!-- <groupId>org.springframework.boot</groupId>--> |
||||
|
<!-- <artifactId>spring-boot-maven-plugin</artifactId>--> |
||||
|
<!-- <configuration>--> |
||||
|
<!-- <excludes>--> |
||||
|
<!-- <exclude>--> |
||||
|
<!-- <groupId>org.projectlombok</groupId>--> |
||||
|
<!-- <artifactId>lombok</artifactId>--> |
||||
|
<!-- </exclude>--> |
||||
|
<!-- </excludes>--> |
||||
|
<!-- </configuration>--> |
||||
|
<!-- </plugin>--> |
||||
|
|
||||
|
<!-- <!– 使用 Maven Assembly 插件 –>--> |
||||
|
|
||||
|
<!-- </plugins>--> |
||||
|
<!-- </build>--> |
||||
|
<build> |
||||
|
<plugins> |
||||
|
<plugin> |
||||
|
<groupId>org.apache.maven.plugins</groupId> |
||||
|
<artifactId>maven-jar-plugin</artifactId> |
||||
|
<configuration> |
||||
|
<!--不打入jar包的文件类型或者路径--> |
||||
|
<excludes> |
||||
|
<exclude>*.properties</exclude> |
||||
|
<exclude>*.yml</exclude> |
||||
|
<exclude>*.yaml</exclude> |
||||
|
</excludes> |
||||
|
<archive> |
||||
|
<manifest> |
||||
|
<!-- 执行的主程序路径 --> |
||||
|
<mainClass>com.bfd.dataprocess.DataProcessApplication</mainClass> |
||||
|
<!--是否要把第三方jar放到manifest的classpath中--> |
||||
|
<addClasspath>true</addClasspath> |
||||
|
<!--生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/--> |
||||
|
<classpathPrefix>lib/</classpathPrefix> |
||||
|
<!-- 打包时 MANIFEST.MF 文件不记录的时间戳版本 --> |
||||
|
<useUniqueVersions>false</useUniqueVersions> |
||||
|
</manifest> |
||||
|
<manifestEntries> |
||||
|
<!-- 在 Class-Path 下添加配置文件的路径 --> |
||||
|
<Class-Path>config/</Class-Path> |
||||
|
</manifestEntries> |
||||
|
</archive> |
||||
|
</configuration> |
||||
|
</plugin> |
||||
|
<plugin> |
||||
|
<groupId>org.apache.maven.plugins</groupId> |
||||
|
<artifactId>maven-dependency-plugin</artifactId> |
||||
|
<executions> |
||||
|
<execution> |
||||
|
<id>copy</id> |
||||
|
<phase>package</phase> |
||||
|
<goals> |
||||
|
<goal>copy-dependencies</goal> |
||||
|
</goals> |
||||
|
<configuration> |
||||
|
<outputDirectory>${project.build.directory}/lib/</outputDirectory> |
||||
|
</configuration> |
||||
|
</execution> |
||||
|
</executions> |
||||
|
</plugin> |
||||
|
|
||||
|
<plugin> |
||||
|
<artifactId>maven-resources-plugin</artifactId> |
||||
|
<executions> |
||||
|
<execution> |
||||
|
<id>copy-resources</id> |
||||
|
<phase>package</phase> |
||||
|
<goals> |
||||
|
<goal>copy-resources</goal> |
||||
|
</goals> |
||||
|
<configuration> |
||||
|
<resources> |
||||
|
<!--把配置文件打包到指定路径--> |
||||
|
<resource> |
||||
|
<directory>src/main/resources/</directory> |
||||
|
<includes> |
||||
|
<include>*.properties</include> |
||||
|
<include>*.yml</include> |
||||
|
<exclude>*.yaml</exclude> |
||||
|
</includes> |
||||
|
</resource> |
||||
|
</resources> |
||||
|
<outputDirectory>${project.build.directory}/config</outputDirectory> |
||||
|
</configuration> |
||||
|
</execution> |
||||
|
</executions> |
||||
|
</plugin> |
||||
|
</plugins> |
||||
|
</build> |
||||
|
|
||||
|
</project> |
@ -0,0 +1,23 @@ |
|||||
|
package com.bfd.dataprocess; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.boot.SpringApplication; |
||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication; |
||||
|
import org.springframework.context.ConfigurableApplicationContext; |
||||
|
|
||||
|
/** |
||||
|
* @author guowei |
||||
|
*/ |
||||
|
|
||||
|
@SpringBootApplication |
||||
|
@Slf4j |
||||
|
public class DataProcessApplication { |
||||
|
public static void main(String[] args) { |
||||
|
ConfigurableApplicationContext applicationContext = SpringApplication.run(DataProcessApplication.class, args); |
||||
|
applicationContext.getBean(DataProcessApplication.class).start(); |
||||
|
} |
||||
|
|
||||
|
public void start(){ |
||||
|
|
||||
|
} |
||||
|
} |
@ -0,0 +1,32 @@ |
|||||
|
package com.bfd.dataprocess.config; |
||||
|
|
||||
|
/** |
||||
|
* @author:jinming |
||||
|
* @className:Constant |
||||
|
* @version:1.0 |
||||
|
* @description: |
||||
|
* @Date:2023/8/16 15:26 |
||||
|
*/ |
||||
|
public class Constant { |
||||
|
/** |
||||
|
* 空字符串常量 |
||||
|
*/ |
||||
|
public static final String EMPTY = ""; |
||||
|
|
||||
|
/** |
||||
|
* 不需要DataUtil解析的Key |
||||
|
*/ |
||||
|
public static final String NOT_KEY = ":$"; |
||||
|
|
||||
|
public final static String STOP = "stop"; |
||||
|
|
||||
|
public final static String OR = "or"; |
||||
|
|
||||
|
public final static String SELECT = "select"; |
||||
|
|
||||
|
public final static String OPERATION = "operation"; |
||||
|
|
||||
|
public final static String CHAR = "`"; |
||||
|
|
||||
|
public final static String TRACE= "trace"; |
||||
|
} |
@ -0,0 +1,64 @@ |
|||||
|
package com.bfd.dataprocess.controller; |
||||
|
|
||||
|
import com.alibaba.fastjson2.JSONObject; |
||||
|
import com.bfd.dataprocess.service.ProcessService; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.web.bind.annotation.*; |
||||
|
|
||||
|
import javax.annotation.Resource; |
||||
|
|
||||
|
/** |
||||
|
* @author guowei |
||||
|
*/ |
||||
|
@RestController |
||||
|
@Slf4j |
||||
|
@RequestMapping(value = "/api") |
||||
|
@CrossOrigin(origins = "*", maxAge = 3600) |
||||
|
public class ApiController { |
||||
|
|
||||
|
@Resource |
||||
|
ProcessService processService; |
||||
|
|
||||
|
/** |
||||
|
* insert Api |
||||
|
* @param jsonObject |
||||
|
* @return |
||||
|
*/ |
||||
|
@RequestMapping(value = "/insertData", method = RequestMethod.POST, produces = "application/json") |
||||
|
@ResponseBody |
||||
|
public String insertData(@RequestBody JSONObject jsonObject) { |
||||
|
processService.insertData(jsonObject); |
||||
|
return "success"; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* query Api |
||||
|
* @param jsonObject |
||||
|
* @return |
||||
|
*/ |
||||
|
@RequestMapping(value = "/queryData", method = RequestMethod.POST, produces = "application/json") |
||||
|
@ResponseBody |
||||
|
public String queryData(@RequestBody JSONObject jsonObject) { |
||||
|
processService.queryData(jsonObject); |
||||
|
return "success"; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* query field Api |
||||
|
* @param jsonObject |
||||
|
* @return |
||||
|
*/ |
||||
|
@RequestMapping(value = "/queryField", method = RequestMethod.POST, produces = "application/json") |
||||
|
@ResponseBody |
||||
|
public String queryField(@RequestBody JSONObject jsonObject) { |
||||
|
processService.queryField(jsonObject); |
||||
|
return "success"; |
||||
|
} |
||||
|
|
||||
|
@RequestMapping(value = "/sqlExecution", method = RequestMethod.POST, produces = "application/json") |
||||
|
@ResponseBody |
||||
|
public String sqlImpl(@RequestBody JSONObject jsonObject) { |
||||
|
processService.sqlImpl(jsonObject); |
||||
|
return "success"; |
||||
|
} |
||||
|
} |
@ -0,0 +1,19 @@ |
|||||
|
package com.bfd.dataprocess.service; |
||||
|
|
||||
|
import com.alibaba.fastjson2.JSONObject; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
|
||||
|
/** |
||||
|
* @author guowei |
||||
|
*/ |
||||
|
@Service |
||||
|
public interface ProcessService { |
||||
|
|
||||
|
void insertData(JSONObject jsonObject); |
||||
|
|
||||
|
void queryData(JSONObject jsonObject); |
||||
|
|
||||
|
void queryField(JSONObject jsonObject); |
||||
|
|
||||
|
void sqlImpl(JSONObject jsonObject); |
||||
|
} |
@ -0,0 +1,543 @@ |
|||||
|
package com.bfd.dataprocess.service.impl; |
||||
|
|
||||
|
import cn.hutool.core.util.IdUtil; |
||||
|
import com.alibaba.fastjson2.*; |
||||
|
import com.bfd.dataprocess.config.Constant; |
||||
|
import com.bfd.dataprocess.service.ProcessService; |
||||
|
import com.bfd.dataprocess.util.DataUtil; |
||||
|
import com.bfd.dataprocess.util.DynamicDataSource; |
||||
|
import com.bfd.dataprocess.util.KfkUtil; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.scheduling.annotation.Async; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
|
||||
|
import java.sql.*; |
||||
|
import java.util.*; |
||||
|
|
||||
|
/** |
||||
|
* @author guowei |
||||
|
*/ |
||||
|
@Service |
||||
|
@Slf4j |
||||
|
public class ProcessServiceImpl implements ProcessService { |
||||
|
@Autowired |
||||
|
DynamicDataSource dynamicDataSource; |
||||
|
|
||||
|
@Override |
||||
|
public void insertData(JSONObject jsonObject) { |
||||
|
Map staMap = new HashMap<>(16); |
||||
|
JSONObject data = jsonObject.getJSONObject("data"); |
||||
|
//输入 |
||||
|
JSONObject input = jsonObject.getJSONObject("input"); |
||||
|
log.info("insertData ---> input:" + JSON.toJSONString(input)); |
||||
|
log.info("insertData ---> data:" + JSON.toJSONString(data)); |
||||
|
//数据库信息 |
||||
|
JSONObject conn_info = input.getJSONObject("conn_info"); |
||||
|
//场景id |
||||
|
Integer scenes_id = jsonObject.getInteger("scenes_id"); |
||||
|
//字段 |
||||
|
JSONArray form = input.getJSONArray("form"); |
||||
|
Map resultMap = new HashMap<>(16); |
||||
|
resultMap.put("id", IdUtil.randomUUID()); |
||||
|
try { |
||||
|
// 构建 INSERT 语句的 SQL 字符串 |
||||
|
StringBuilder sqlBuilder = new StringBuilder(); |
||||
|
sqlBuilder.append("INSERT INTO ").append(conn_info.getString("table")).append(" ("); |
||||
|
StringBuilder valuesBuilder = new StringBuilder(") VALUES ("); |
||||
|
List valueList = new ArrayList<>(); |
||||
|
form.forEach(value -> { |
||||
|
JSONObject item = JSONObject.parseObject(JSON.toJSONString(value)); |
||||
|
String fieldName = item.getString("field"); |
||||
|
String fieldValue = item.get("value").toString(); |
||||
|
if (fieldValue.isEmpty()) { |
||||
|
log.info("字段:" + fieldName + ",值为空,fieldValue:" + fieldValue); |
||||
|
return; |
||||
|
} |
||||
|
sqlBuilder.append(Constant.CHAR).append(fieldName).append(Constant.CHAR).append(", "); |
||||
|
String[] fieldValue_split = fieldValue.split(":"); |
||||
|
if (fieldValue_split.length > 1) { |
||||
|
JSONObject fieldData = data.getJSONObject(fieldValue_split[0]); |
||||
|
//.getString(fieldValue_split[1]); |
||||
|
fieldValue = JSONPath.eval(fieldData, fieldValue_split[1]).toString(); |
||||
|
} |
||||
|
valuesBuilder.append(" ? ").append(", "); |
||||
|
valueList.add(fieldValue); |
||||
|
}); |
||||
|
// 删除最后一个逗号和空格 |
||||
|
sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()); |
||||
|
valuesBuilder.delete(valuesBuilder.length() - 2, valuesBuilder.length()); |
||||
|
|
||||
|
sqlBuilder.append(valuesBuilder).append(")"); |
||||
|
String sql = sqlBuilder.toString(); |
||||
|
// System.out.println("Generated SQL: " + sql); |
||||
|
log.info("Generated SQL: " + sql); |
||||
|
// 创建 PreparedStatement 对象 |
||||
|
try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); |
||||
|
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { |
||||
|
for (int i = 0; i < valueList.size(); i++) { |
||||
|
// 根据字段的顺序设置参数值 |
||||
|
preparedStatement.setObject(i + 1, valueList.get(i).toString()); |
||||
|
} |
||||
|
|
||||
|
// System.out.println("Finall Generated INSERT SQL: " + preparedStatement.toString()); |
||||
|
log.info("Finall Generated INSERT SQL: " + preparedStatement.toString()); |
||||
|
// 执行插入操作 |
||||
|
int rowsInserted = preparedStatement.executeUpdate(); |
||||
|
// System.out.println("Rows inserted: " + rowsInserted); |
||||
|
log.info("Rows inserted: " + rowsInserted); |
||||
|
|
||||
|
if (rowsInserted > 0) { |
||||
|
resultMap.put("content", "写入成功"); |
||||
|
staMap.put("status",1); |
||||
|
staMap.put("message","成功"); |
||||
|
} else { |
||||
|
resultMap.put("content", "写入失败"); |
||||
|
staMap.put("status",2); |
||||
|
staMap.put("message","失败"); |
||||
|
} |
||||
|
} catch (SQLException sqlException) { |
||||
|
sqlException.printStackTrace(); |
||||
|
log.error("执行插入失败", sqlException); |
||||
|
resultMap.put("content", "失败"); |
||||
|
staMap.put("status",2); |
||||
|
staMap.put("message","失败,sql异常"); |
||||
|
} |
||||
|
|
||||
|
|
||||
|
} catch (Throwable e) { |
||||
|
log.error("insert error", e); |
||||
|
e.printStackTrace(); |
||||
|
resultMap.put("content", "写入失败"); |
||||
|
staMap.put("status",2); |
||||
|
staMap.put("message","失败"); |
||||
|
} |
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
resultMap.put("isLast",1); |
||||
|
results.put("results", JSON.toJSONString(resultMap)); |
||||
|
results.putAll(staMap); |
||||
|
jsonObject.put("result", results); |
||||
|
|
||||
|
KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(resultMap)); |
||||
|
|
||||
|
} |
||||
|
|
||||
|
@Async |
||||
|
@Override |
||||
|
public void queryData(JSONObject jsonObject) { |
||||
|
// PreparedStatement preparedStatement = null; |
||||
|
// Connection connection = null; |
||||
|
try { |
||||
|
Map dataMap = jsonObject.getObject("data", Map.class); |
||||
|
//输入 |
||||
|
JSONObject input = jsonObject.getJSONObject("input"); |
||||
|
//输出 |
||||
|
JSONObject output = jsonObject.getJSONObject("output"); |
||||
|
log.info("queryData ---> input:" + JSON.toJSONString(input)); |
||||
|
log.info("queryData ---> output:" + JSON.toJSONString(output)); |
||||
|
//数据库信息 |
||||
|
JSONObject conn_info = input.getJSONObject("conn_info"); |
||||
|
//查询条件 |
||||
|
JSONObject condition = input.getJSONObject("condition"); |
||||
|
//场景id |
||||
|
Integer scenes_id = jsonObject.getInteger("scenes_id"); |
||||
|
// 构建 SELECT 语句的 SQL 字符串 |
||||
|
StringBuilder sqlBuilder = new StringBuilder(); |
||||
|
sqlBuilder.append("SELECT "); |
||||
|
StringBuilder conditionBuilder = new StringBuilder(" WHERE 1=1 and "); |
||||
|
//查询字段 |
||||
|
for (Map.Entry<String, Object> entry : output.entrySet()) { |
||||
|
// System.out.println("Key: " + entry.getKey() + ", Value: " + entry.getValue()); |
||||
|
sqlBuilder.append(Constant.CHAR).append(entry.getKey()).append(Constant.CHAR).append(", "); |
||||
|
} |
||||
|
|
||||
|
// 删除最后一个逗号和空格 |
||||
|
sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()); |
||||
|
//拼接表名 |
||||
|
sqlBuilder.append(" FROM ").append(conn_info.getString("table")); |
||||
|
if (condition.containsKey(Constant.OR)) { |
||||
|
JSONArray or = condition.getJSONArray("or"); |
||||
|
or.forEach(value_or -> { |
||||
|
StringBuilder orBuilder = new StringBuilder(); |
||||
|
ArrayList item_or = (ArrayList) value_or; |
||||
|
item_or.forEach(value_and -> { |
||||
|
JSONObject item_and = JSONObject.parseObject(JSON.toJSONString(value_and)); |
||||
|
String value = item_and.getString("value"); |
||||
|
if (value.isEmpty()) { |
||||
|
return; |
||||
|
} |
||||
|
String dataForValue = (String) DataUtil.getValue(value, dataMap); |
||||
|
// JSONObject item_and = (JSONObject) value_and; |
||||
|
StringBuilder andBuilder = new StringBuilder() |
||||
|
//避免关键字,添加字符 |
||||
|
.append(Constant.CHAR) |
||||
|
.append(item_and.getString("field")) |
||||
|
.append(Constant.CHAR) |
||||
|
.append(item_and.getString("compare_symbol")) |
||||
|
.append("'" + dataForValue + "'") |
||||
|
.append(" and "); |
||||
|
orBuilder.append(andBuilder); |
||||
|
}); |
||||
|
if (orBuilder.length()==0){ |
||||
|
return; |
||||
|
} |
||||
|
//去掉最后一个and |
||||
|
orBuilder.delete(orBuilder.length() - 5, orBuilder.length()); |
||||
|
|
||||
|
//如果有or条件,再拼接括号 |
||||
|
if (or.size() > 1) { |
||||
|
orBuilder.insert(0, " ( "); |
||||
|
orBuilder.append(" )"); |
||||
|
} |
||||
|
|
||||
|
conditionBuilder.append(orBuilder).append(" or "); |
||||
|
|
||||
|
}); |
||||
|
//去掉最后一个or |
||||
|
|
||||
|
conditionBuilder.delete(conditionBuilder.length() - 4, conditionBuilder.length()); |
||||
|
|
||||
|
//组装SQL 条件 |
||||
|
sqlBuilder.append(conditionBuilder); |
||||
|
} |
||||
|
// 每批处理的行数 |
||||
|
int batchSize = 1000; |
||||
|
// 初始偏移量 |
||||
|
int offset = 0; |
||||
|
if (jsonObject.containsKey(Constant.TRACE) && jsonObject.getBoolean(Constant.TRACE)==true){ |
||||
|
log.info("测试流程,限制最多1条"); |
||||
|
sqlBuilder.append(" LIMIT 1"); |
||||
|
}else { |
||||
|
sqlBuilder.append(" LIMIT " + batchSize); |
||||
|
} |
||||
|
//获取数据库连接 |
||||
|
try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id)) { |
||||
|
int rowCount = 0; |
||||
|
while (true) { |
||||
|
sqlBuilder.append(" OFFSET " + offset); |
||||
|
String sql = sqlBuilder.toString(); |
||||
|
// System.out.println("Generated Query SQL: " + sql); |
||||
|
log.info("Generated Query SQL: " + sql); |
||||
|
// connection = dynamicDataSource.getConnection(conn_info); |
||||
|
// 创建 PreparedStatement 对象 |
||||
|
// preparedStatement = connection.prepareStatement(sql); |
||||
|
// 执行查询 |
||||
|
try (PreparedStatement preparedStatement = connection.prepareStatement(sql); |
||||
|
ResultSet resultSet = preparedStatement.executeQuery()) { |
||||
|
// 获取总行数 |
||||
|
resultSet.last(); |
||||
|
int totalRows = resultSet.getRow(); |
||||
|
resultSet.beforeFirst(); |
||||
|
int currentRow = 0; |
||||
|
// 遍历结果集并处理数据 |
||||
|
while (resultSet.next()) { |
||||
|
currentRow++; |
||||
|
Map<String, Object> rowMap = new HashMap<>(16); |
||||
|
ResultSetMetaData metaData = resultSet.getMetaData(); |
||||
|
int columnCount = metaData.getColumnCount(); |
||||
|
for (int i = 1; i <= columnCount; i++) { |
||||
|
String columnName = metaData.getColumnName(i); |
||||
|
Object columnValue = resultSet.getObject(i); |
||||
|
rowMap.put(columnName, columnValue); |
||||
|
} |
||||
|
// 判断是否是最后一条数据 |
||||
|
if (currentRow == totalRows) { |
||||
|
rowMap.put("isLast", 1); |
||||
|
} |
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
results.put("results", JSON.toJSONString(rowMap)); |
||||
|
results.put("status",1); |
||||
|
results.put("message","成功"); |
||||
|
jsonObject.put("result", results); |
||||
|
KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(rowMap)); |
||||
|
rowCount++; |
||||
|
} |
||||
|
|
||||
|
if (rowCount == 0) { |
||||
|
log.info("查询结果为空"); |
||||
|
Map<String, Object> rowMap = new HashMap<>(16); |
||||
|
for (Map.Entry<String, Object> entry : output.entrySet()) { |
||||
|
rowMap.put(entry.getKey(), ""); |
||||
|
} |
||||
|
rowMap.put("isLast",1); |
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
results.put("results", JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); |
||||
|
results.put("status",1); |
||||
|
results.put("message","成功"); |
||||
|
jsonObject.put("result", results); |
||||
|
KfkUtil.sendKafka(JSON.toJSONString(jsonObject, JSONWriter.Feature.WriteMapNullValue)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); |
||||
|
} |
||||
|
if (batchSize > resultSet.getRow()) { |
||||
|
break; |
||||
|
} |
||||
|
offset += batchSize; |
||||
|
} catch (Throwable e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("执行查询并推送失败", e); |
||||
|
break; |
||||
|
} |
||||
|
} |
||||
|
log.info("查询结果处理完成,rowCount:" + rowCount); |
||||
|
// 关闭连接和语句 |
||||
|
// resultSet.close(); |
||||
|
// 如果当前结果集小于批处理大小,则表示已经到达末尾 |
||||
|
// 增加偏移量,准备处理下一批数据 |
||||
|
} catch (Throwable e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("执行查询失败", e); |
||||
|
|
||||
|
} |
||||
|
// preparedStatement.close(); |
||||
|
// connection.close(); |
||||
|
} catch (Throwable e) { |
||||
|
log.error("查询失败", e); |
||||
|
e.printStackTrace(); |
||||
|
} |
||||
|
|
||||
|
|
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void queryField(JSONObject jsonObject) { |
||||
|
//输入 |
||||
|
JSONObject input = jsonObject.getJSONObject("input"); |
||||
|
log.info("queryField ---> input:" + JSON.toJSONString(input)); |
||||
|
//数据库信息 |
||||
|
JSONObject conn_info = input.getJSONObject("conn_info"); |
||||
|
//场景id |
||||
|
Integer scenes_id = jsonObject.getInteger("scenes_id"); |
||||
|
//构建查询语句 |
||||
|
StringBuilder sqlBuilder = new StringBuilder(" SHOW COLUMNS FROM ").append(conn_info.getString("table")); |
||||
|
String sql = sqlBuilder.toString(); |
||||
|
// System.out.println("Generated QueryField SQL: " + sql); |
||||
|
log.info("Generated QueryField SQL: " + sql); |
||||
|
try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); |
||||
|
PreparedStatement preparedStatement = connection.prepareStatement(sql); |
||||
|
ResultSet resultSet = preparedStatement.executeQuery()) { |
||||
|
Map fieldMap = new LinkedHashMap(); |
||||
|
while (resultSet.next()) { |
||||
|
String columnName = resultSet.getString("Field"); |
||||
|
// System.out.println(columnName); |
||||
|
fieldMap.put(columnName, columnName); |
||||
|
} |
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
fieldMap.put("isLast",1); |
||||
|
results.put("results", JSON.toJSONString(fieldMap)); |
||||
|
results.put("status",1); |
||||
|
results.put("message","成功"); |
||||
|
|
||||
|
jsonObject.put("result", results); |
||||
|
KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(fieldMap)); |
||||
|
|
||||
|
} catch (Throwable e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("查询表字段失败", e); |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void sqlImpl(JSONObject jsonObject) { |
||||
|
//输入 |
||||
|
JSONObject input = jsonObject.getJSONObject("input"); |
||||
|
//输出 |
||||
|
JSONObject output = jsonObject.getJSONObject("output"); |
||||
|
|
||||
|
log.info("queryData ---> input:" + JSON.toJSONString(input)); |
||||
|
log.info("queryData ---> output:" + JSON.toJSONString(output)); |
||||
|
|
||||
|
//数据库信息 |
||||
|
JSONObject conn_info = input.getJSONObject("conn_info"); |
||||
|
//场景id |
||||
|
Integer scenes_id = jsonObject.getInteger("scenes_id"); |
||||
|
|
||||
|
JSONObject data = jsonObject.getJSONObject("data"); |
||||
|
log.info("queryData ---> data:" + JSON.toJSONString(data)); |
||||
|
//执行sql拼接 |
||||
|
JSONArray prompt = input.getJSONArray("prompt"); |
||||
|
StringBuilder sqlBuilder = new StringBuilder(); |
||||
|
prompt.forEach(value -> { |
||||
|
// JSONObject item = (JSONObject) value; |
||||
|
JSONObject item = JSONObject.parseObject(JSON.toJSONString(value)); |
||||
|
//type=1文本内容 |
||||
|
int two = 2; |
||||
|
//type=2变量值 |
||||
|
Integer type = item.getInteger("type"); |
||||
|
if (type == 1) { |
||||
|
sqlBuilder.append(item.getString("value")); |
||||
|
} else if (type == two) { |
||||
|
String[] fieldValue_split = item.getString("value").split(":"); |
||||
|
JSONObject fieldData = data.getJSONObject(fieldValue_split[0]); |
||||
|
sqlBuilder.append(JSONPath.eval(fieldData, fieldValue_split[1])); |
||||
|
} |
||||
|
}); |
||||
|
String sql = sqlBuilder.toString(); |
||||
|
// System.out.println("Generated sqlImpl SQL: " + sql); |
||||
|
log.info("Generated sqlImpl SQL: " + sql); |
||||
|
//查询语句 |
||||
|
Set<String> strings = output.keySet(); |
||||
|
if (sql.toLowerCase().contains(Constant.SELECT)) { |
||||
|
if (jsonObject.containsKey(Constant.TRACE) && jsonObject.getBoolean(Constant.TRACE)==true){ |
||||
|
log.info("测试流程,限制最多1条"); |
||||
|
sqlBuilder.append(" LIMIT 1"); |
||||
|
} |
||||
|
try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); |
||||
|
PreparedStatement preparedStatement = connection.prepareStatement(sql); |
||||
|
ResultSet resultSet = preparedStatement.executeQuery()) { |
||||
|
int fieldType = input.getInteger("fieldType"); |
||||
|
if (fieldType == 0) { |
||||
|
List<Map> dataMapList = new ArrayList<>(); |
||||
|
while (resultSet.next()) { |
||||
|
Map dataMap = new HashMap(16); |
||||
|
ResultSetMetaData metaData = resultSet.getMetaData(); |
||||
|
int columnCount = metaData.getColumnCount(); |
||||
|
for (int i = 1; i <= columnCount; i++) { |
||||
|
String columnName = metaData.getColumnLabel(i); |
||||
|
Object columnValue = resultSet.getObject(i); |
||||
|
dataMap.put(columnName, columnValue); |
||||
|
} |
||||
|
dataMapList.add(dataMap); |
||||
|
} |
||||
|
Map contentMap = new HashMap<>(16); |
||||
|
contentMap.put("content", dataMapList); |
||||
|
contentMap.put("isLast",1); |
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
results.put("results", JSON.toJSONString(contentMap)); |
||||
|
results.put("status",1); |
||||
|
results.put("message","成功"); |
||||
|
jsonObject.put("result", results); |
||||
|
KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(contentMap)); |
||||
|
return; |
||||
|
} |
||||
|
int rowCount = 0; |
||||
|
// 获取总行数 |
||||
|
resultSet.last(); |
||||
|
int totalRows = resultSet.getRow(); |
||||
|
resultSet.beforeFirst(); |
||||
|
int currentRow = 0; |
||||
|
while (resultSet.next()) { |
||||
|
currentRow++; |
||||
|
Map<String, Object> rowMap = new HashMap<>(16); |
||||
|
ResultSetMetaData metaData = resultSet.getMetaData(); |
||||
|
int columnCount = metaData.getColumnCount(); |
||||
|
for (int i = 1; i <= columnCount; i++) { |
||||
|
String columnName = metaData.getColumnLabel(i); |
||||
|
Object columnValue = resultSet.getObject(i); |
||||
|
log.info("列名:"+columnName+",value:"+columnValue); |
||||
|
if (strings.contains(columnName)) { |
||||
|
rowMap.put(columnName, columnValue); |
||||
|
} |
||||
|
} |
||||
|
// 判断是否是最后一条数据 |
||||
|
if (currentRow == totalRows) { |
||||
|
rowMap.put("isLast", 1); |
||||
|
} |
||||
|
rowMap.put("is_diffusion",true); |
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
results.put("results", JSON.toJSONString(rowMap)); |
||||
|
results.put("status",1); |
||||
|
results.put("message","成功"); |
||||
|
jsonObject.put("result", results); |
||||
|
KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(rowMap)); |
||||
|
rowCount++; |
||||
|
} |
||||
|
if (rowCount == 0) { |
||||
|
log.info("查询结果为空"); |
||||
|
Map<String, Object> rowMap = new HashMap<>(16); |
||||
|
for (Map.Entry<String, Object> entry : output.entrySet()) { |
||||
|
rowMap.put(entry.getKey(), ""); |
||||
|
} |
||||
|
rowMap.put("isLast", 1); |
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
results.put("results", JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); |
||||
|
results.put("status",1); |
||||
|
results.put("message","成功"); |
||||
|
jsonObject.put("result", results); |
||||
|
KfkUtil.sendKafka(JSONObject.toJSONString(jsonObject, JSONWriter.Feature.WriteMapNullValue)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); |
||||
|
} |
||||
|
|
||||
|
} catch (Throwable e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("执行失败", e); |
||||
|
} |
||||
|
} else { |
||||
|
int rowsInserted = 0; |
||||
|
try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); |
||||
|
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { |
||||
|
rowsInserted = preparedStatement.executeUpdate(); |
||||
|
|
||||
|
log.info("执行成功,result:" + rowsInserted); |
||||
|
|
||||
|
Map<Object, Object> results = new HashMap<>(16); |
||||
|
Map contentMap = new HashMap<>(16); |
||||
|
contentMap.put("content", rowsInserted); |
||||
|
contentMap.put("isLast", 1); |
||||
|
results.put("results", JSON.toJSONString(contentMap)); |
||||
|
results.put("status",1); |
||||
|
results.put("message","成功"); |
||||
|
jsonObject.put("result", results); |
||||
|
KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); |
||||
|
log.info("推送成功,result:" + JSON.toJSONString(contentMap)); |
||||
|
} catch (Throwable e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("执行失败", e); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public static void main(String[] args) { |
||||
|
// String formJson = "{\"form\": [" + |
||||
|
// "{\"field\": \"subject_id\",\"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f0']\"}," + |
||||
|
// "{\"field\": \"id\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f1']\"}," + |
||||
|
// "{\"field\": \"name\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f2']\"}," + |
||||
|
// "{\"field\": \"name_en\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f2']\"}," + |
||||
|
// "{\"field\": \"publication_num\", \"dataType\": \"integer\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f3']\"}," + |
||||
|
// "{\"field\": \"popular_paper_rate\", \"dataType\": \"double\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f5']\"}," + |
||||
|
// "{\"field\": \"top_paper_rate\", \"dataType\": \"double\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f6']\"}," + |
||||
|
// "{\"field\": \"h_index\", \"dataType\": \"double\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f7']\"}," + |
||||
|
// "{\"field\": \"affiliation\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f8']\"}," + |
||||
|
// "{\"field\": \"nationality\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f9']\"}," + |
||||
|
// "{\"field\": \"citation_num\", \"dataType\": \"integer\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['10']\"}" + |
||||
|
// "]}"; |
||||
|
// String formDataJson = "{\"5013b99b-61a9-49dd-a571-66516f86f5d9\": {\"f0\": \"subject_id_value\", \"f1\": \"id_value\", " + |
||||
|
// "\"f2\": \"name_value\", \"f3\": 123, \"f5\": 12.34, \"f6\": 56.78, \"f7\": 9.0, \"f8\": \"affiliation_value\", " + |
||||
|
// "\"f9\": \"nationality_value\", \"10\": 456}}"; |
||||
|
// JSONObject data = JSONObject.parseObject(formDataJson); |
||||
|
// JSONObject fromdata = JSONObject.parseObject(formJson); |
||||
|
// JSONArray from = fromdata.getJSONArray("form"); |
||||
|
// |
||||
|
// StringBuilder sqlBuilder = new StringBuilder(); |
||||
|
// sqlBuilder.append("INSERT INTO ").append("sssss").append(" ("); |
||||
|
// StringBuilder valuesBuilder = new StringBuilder(") VALUES ("); |
||||
|
// |
||||
|
// from.forEach(value -> { |
||||
|
// JSONObject item = (JSONObject) value; |
||||
|
// String fieldName = item.getString("field"); |
||||
|
// String fieldValue = item.getString("value"); |
||||
|
// String[] fieldValue_split = fieldValue.split(":"); |
||||
|
// JSONObject fieldData = data.getJSONObject(fieldValue_split[0]); |
||||
|
// //.getString(fieldValue_split[1]); |
||||
|
// fieldValue = JSONPath.eval(fieldData, fieldValue_split[1]).toString(); |
||||
|
// sqlBuilder.append(fieldName).append(", "); |
||||
|
// valuesBuilder.append(fieldValue).append(", "); |
||||
|
// }); |
||||
|
// // 删除最后一个逗号和空格 |
||||
|
// sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()); |
||||
|
// valuesBuilder.delete(valuesBuilder.length() - 2, valuesBuilder.length()); |
||||
|
// |
||||
|
// sqlBuilder.append(valuesBuilder).append(")"); |
||||
|
// String sql = sqlBuilder.toString(); |
||||
|
// System.out.println("Generated SQL: " + sql); |
||||
|
String fieldValue = "2"; |
||||
|
String[] fieldValue_split = fieldValue.split(":"); |
||||
|
System.out.println(fieldValue_split.length); |
||||
|
|
||||
|
} |
||||
|
} |
@ -0,0 +1,37 @@ |
|||||
|
package com.bfd.dataprocess.util; |
||||
|
|
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.scheduling.annotation.AsyncConfigurer; |
||||
|
import org.springframework.scheduling.annotation.EnableAsync; |
||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
||||
|
|
||||
|
import java.util.concurrent.Executor; |
||||
|
|
||||
|
|
||||
|
@Configuration |
||||
|
@EnableAsync //Java配置文件标注它,那么Spring就会开启异步可用 |
||||
|
/** |
||||
|
* @author guowei |
||||
|
* 异步任务线程池 |
||||
|
* 注解@EnableAsync代表开启Spring异步。这样就可以使用@Async驱动Spring使用异步, |
||||
|
* 但是异步需要提供可用线程池,所以这里的配置类还会实现AsyncConfigurer接口,然后覆盖getAsyncExecutor方法,这样就可以自定义一个线程池 |
||||
|
*/ |
||||
|
public class AsyncConfig implements AsyncConfigurer { |
||||
|
|
||||
|
@Override |
||||
|
public Executor getAsyncExecutor() { |
||||
|
//定义线程池 |
||||
|
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); |
||||
|
//核心线程数 |
||||
|
threadPoolTaskExecutor.setCorePoolSize(10); |
||||
|
//线程池最大线程数 |
||||
|
threadPoolTaskExecutor.setMaxPoolSize(50); |
||||
|
//线程队列最大线程数 |
||||
|
threadPoolTaskExecutor.setQueueCapacity(200); |
||||
|
//初始化 |
||||
|
threadPoolTaskExecutor.initialize(); |
||||
|
|
||||
|
return threadPoolTaskExecutor; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,65 @@ |
|||||
|
package com.bfd.dataprocess.util; |
||||
|
|
||||
|
import java.util.Map; |
||||
|
|
||||
|
import com.alibaba.fastjson2.JSON; |
||||
|
import com.alibaba.fastjson2.JSONObject; |
||||
|
import com.alibaba.fastjson2.JSONPath; |
||||
|
import com.bfd.dataprocess.config.Constant; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* @author:jinming |
||||
|
* @className:DataUtil |
||||
|
* @version:1.0 |
||||
|
* @description: 获取dataValue的值 |
||||
|
* @Date:2023/11/1 9:54 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public class DataUtil { |
||||
|
/** |
||||
|
* @param key 传入的key |
||||
|
* @param dataMap 数据map |
||||
|
* @return 根据传入的参数进行判断解析,返回正确的dataValue |
||||
|
*/ |
||||
|
public static Object getValue(String key, Map dataMap) { |
||||
|
try { |
||||
|
//公式为空直接就返回 |
||||
|
if (key.equals(Constant.EMPTY)) { |
||||
|
return Constant.EMPTY; |
||||
|
} |
||||
|
if (!key.contains(Constant.NOT_KEY)) { |
||||
|
return key; |
||||
|
} |
||||
|
Object dataValue; |
||||
|
String isJson = "#json#"; |
||||
|
if (key.contains(isJson)) { |
||||
|
//进行第一次拆分,获取#json#前面的部分 |
||||
|
String[] keySplit = key.split(isJson); |
||||
|
String firstDataKey = keySplit[0]; |
||||
|
String[] firstDataKeySplit = firstDataKey.split(":"); |
||||
|
//取出前半部分对应的JSON数据并转换为JSONObject |
||||
|
String dataJson = (String) dataMap.get(firstDataKeySplit[0]); |
||||
|
JSONObject dataJsonObject = JSON.parseObject(dataJson); |
||||
|
//根据key的后半部分取出对应JSONObject中的值 |
||||
|
String firstDataKeyJson = (String) JSONPath.eval(dataJsonObject, firstDataKeySplit[1]); |
||||
|
String secDataKey = keySplit[1]; |
||||
|
JSONObject firstDataJsonObject = JSON.parseObject(firstDataKeyJson); |
||||
|
dataValue = JSONPath.eval(firstDataJsonObject, secDataKey); |
||||
|
return dataValue; |
||||
|
} |
||||
|
String[] keySplit = key.split(":"); |
||||
|
String jsonPath = keySplit[1]; |
||||
|
String dataJson = (String) dataMap.get(keySplit[0]); |
||||
|
JSONObject dataJsonObject = JSON.parseObject(dataJson); |
||||
|
dataValue = JSONPath.eval(dataJsonObject, jsonPath); |
||||
|
return dataValue; |
||||
|
} catch (Exception e) { |
||||
|
// TODO: handle exception |
||||
|
log.error("jsonpath公式取值异常,", e); |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
} |
@ -0,0 +1,105 @@ |
|||||
|
package com.bfd.dataprocess.util; |
||||
|
|
||||
|
import com.alibaba.fastjson2.JSONObject; |
||||
|
import com.zaxxer.hikari.HikariDataSource; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import javax.sql.DataSource; |
||||
|
import java.sql.Connection; |
||||
|
import java.sql.SQLException; |
||||
|
import java.util.HashMap; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
/** |
||||
|
* @author guowei |
||||
|
*/ |
||||
|
@Component |
||||
|
@Slf4j |
||||
|
public class DynamicDataSource extends AbstractRoutingDataSource { |
||||
|
|
||||
|
public static Map<Object, Object> targetDataSources = new HashMap<>(); |
||||
|
|
||||
|
@Override |
||||
|
protected Object determineCurrentLookupKey() { |
||||
|
// 这里可以根据具体需求来确定使用哪个数据源,例如根据请求参数等 |
||||
|
// 默认数据源的标识,可以根据需要修改 |
||||
|
return "default"; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void afterPropertiesSet() { |
||||
|
setTargetDataSources(targetDataSources); |
||||
|
// 设置默认数据源 |
||||
|
setDefaultTargetDataSource(targetDataSources.get("default")); |
||||
|
super.afterPropertiesSet(); |
||||
|
} |
||||
|
|
||||
|
public void addTargetDataSource(Integer key, DataSource dataSource) { |
||||
|
targetDataSources.put(key, dataSource); |
||||
|
setTargetDataSources(targetDataSources); |
||||
|
afterPropertiesSet(); |
||||
|
} |
||||
|
|
||||
|
public Connection getConnection(JSONObject connInfo,int scenes_id) throws SQLException { |
||||
|
//2024.6.28 有需求链接海外数据库,添加时区 |
||||
|
String dbUrl = "jdbc:mysql://" + connInfo.get("hostname") + ":" + connInfo.get("port") + "/" + connInfo.get("database")+"?serverTimezone=Asia/Shanghai"; |
||||
|
DataSource dataSource = null; |
||||
|
synchronized (targetDataSources) { |
||||
|
if (!targetDataSources.containsKey(scenes_id)) { |
||||
|
log.info("数据源为空或失效,创建新的数据源,dbUrl:" + dbUrl); |
||||
|
// 创建新的数据源,并添加到 targetDataSources 中 |
||||
|
String dbUsername = connInfo.getString("username"); |
||||
|
String dbPassword = connInfo.getString("password"); |
||||
|
dataSource = createDataSource(dbUrl, dbUsername, dbPassword); |
||||
|
addTargetDataSource(scenes_id, dataSource); |
||||
|
} else { |
||||
|
dataSource = (DataSource) targetDataSources.get(scenes_id); |
||||
|
} |
||||
|
} |
||||
|
return dataSource.getConnection(); |
||||
|
} |
||||
|
|
||||
|
private boolean isValid(DataSource dataSource) { |
||||
|
try (Connection connection = dataSource.getConnection()) { |
||||
|
// 通过尝试获取连接来检查连接池是否有效 |
||||
|
return true; |
||||
|
} catch (SQLException e) { |
||||
|
// 连接失败,说明连接池已失效 |
||||
|
return false; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 创建新的数据源方法,这里需要根据具体情况实现 |
||||
|
*/ |
||||
|
private DataSource createDataSource(String dbUrl, String dbUsername, String dbPassword) { |
||||
|
// // 这里可以使用 Spring Boot 的 DataSourceBuilder 来创建数据源 |
||||
|
// return DataSourceBuilder |
||||
|
// .create() |
||||
|
// .url(dbUrl) |
||||
|
// .username(dbUsername) |
||||
|
// .password(dbPassword) |
||||
|
// .build(); |
||||
|
HikariDataSource dataSource = new HikariDataSource(); |
||||
|
dataSource.setJdbcUrl(dbUrl); |
||||
|
dataSource.setUsername(dbUsername); |
||||
|
dataSource.setPassword(dbPassword); |
||||
|
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); |
||||
|
// 设置连接池大小 |
||||
|
dataSource.setMaximumPoolSize(5); |
||||
|
dataSource.setConnectionTestQuery("SELECT 1"); |
||||
|
// 设置连接池中连接的过期时间 |
||||
|
// 10分钟 |
||||
|
dataSource.setIdleTimeout(600000); |
||||
|
// 30分钟 |
||||
|
dataSource.setMaxLifetime(1800000); |
||||
|
// 30秒 |
||||
|
dataSource.setConnectionTimeout(30000); |
||||
|
// 设置连接泄露检测阈值为 1 分钟 |
||||
|
dataSource.setLeakDetectionThreshold(60000); |
||||
|
return dataSource; |
||||
|
} |
||||
|
} |
||||
|
|
@ -0,0 +1,81 @@ |
|||||
|
package com.bfd.dataprocess.util; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.kafka.clients.producer.KafkaProducer; |
||||
|
import org.apache.kafka.clients.producer.ProducerRecord; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.Properties; |
||||
|
|
||||
|
/** |
||||
|
* @author guowei |
||||
|
* kfk工具类 |
||||
|
*/ |
||||
|
@Component |
||||
|
@Slf4j |
||||
|
public class KfkUtil { |
||||
|
private static String topic; |
||||
|
|
||||
|
private static String brokerList; |
||||
|
|
||||
|
@Value("${crawl.kafka.topic}") |
||||
|
public void setTopic(String topic) { |
||||
|
KfkUtil.topic = topic; |
||||
|
} |
||||
|
|
||||
|
@Value("${crawl.kafka.brokers}") |
||||
|
public void setBrokerList(String brokerList) { |
||||
|
KfkUtil.brokerList = brokerList; |
||||
|
} |
||||
|
private static KafkaProducer<String, String> kafkaProducer; |
||||
|
|
||||
|
public static int num = 0; |
||||
|
|
||||
|
/** |
||||
|
* 获取KafkaProducer实例 |
||||
|
*/ |
||||
|
public static KafkaProducer<String, String> getProducer() { |
||||
|
if (kafkaProducer == null) { |
||||
|
Properties props = new Properties(); |
||||
|
//xxx服务器ip |
||||
|
props.put("bootstrap.servers", brokerList); |
||||
|
//所有follower都响应了才认为消息提交成功,即"committed" |
||||
|
props.put("acks", "all"); |
||||
|
//retries = MAX 无限重试,直到你意识到出现了问题:) |
||||
|
props.put("retries", 3); |
||||
|
//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 |
||||
|
props.put("batch.size", 16384); |
||||
|
//batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms |
||||
|
//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 |
||||
|
props.put("linger.ms", 1); |
||||
|
//producer可以用来缓存数据的内存大小。 |
||||
|
props.put("buffer.memory", 33554432); |
||||
|
props.put("key.serializer", |
||||
|
"org.apache.kafka.common.serialization.StringSerializer"); |
||||
|
props.put("value.serializer", |
||||
|
"org.apache.kafka.common.serialization.StringSerializer"); |
||||
|
kafkaProducer = new KafkaProducer<String, String>(props); |
||||
|
} |
||||
|
return kafkaProducer; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 关闭KafkaProducer实例 |
||||
|
*/ |
||||
|
public static void closeProducer() { |
||||
|
if (kafkaProducer != null) { |
||||
|
log.info("----------close producer----------"); |
||||
|
kafkaProducer.close(); |
||||
|
kafkaProducer = null; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public static void sendKafka(String resultData) { |
||||
|
KafkaProducer<String, String> producer = getProducer(); |
||||
|
ProducerRecord<String, String> se = new ProducerRecord<String, String>(topic, resultData); |
||||
|
producer.send(se); |
||||
|
log.info("发送kafka成功"); |
||||
|
// num++; |
||||
|
} |
||||
|
} |
@ -0,0 +1,25 @@ |
|||||
|
package com.bfd.dataprocess.util; |
||||
|
|
||||
|
import org.apache.curator.framework.CuratorFramework; |
||||
|
import org.apache.curator.framework.CuratorFrameworkFactory; |
||||
|
import org.apache.curator.retry.ExponentialBackoffRetry; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
/** |
||||
|
* @author jian.mao |
||||
|
* @date 2024年4月16日 |
||||
|
* @description |
||||
|
*/ |
||||
|
@Configuration |
||||
|
public class ZookeeperConfig { |
||||
|
@Value("${zookeeper.connection-string}") |
||||
|
private String connectionString; |
||||
|
@Bean |
||||
|
public CuratorFramework curatorFramework() { |
||||
|
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3)); |
||||
|
curatorFramework.start(); |
||||
|
return curatorFramework; |
||||
|
} |
||||
|
} |
@ -0,0 +1,65 @@ |
|||||
|
package com.bfd.dataprocess.util; |
||||
|
|
||||
|
import com.alibaba.fastjson2.JSONObject; |
||||
|
import com.bfd.dataprocess.config.Constant; |
||||
|
import com.zaxxer.hikari.HikariDataSource; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.curator.framework.CuratorFramework; |
||||
|
import org.apache.curator.framework.recipes.cache.NodeCache; |
||||
|
import org.apache.curator.framework.recipes.cache.NodeCacheListener; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import javax.annotation.PostConstruct; |
||||
|
|
||||
|
/** |
||||
|
* @author jian.mao |
||||
|
* @date 2024年4月17日 |
||||
|
* @description |
||||
|
*/ |
||||
|
@Component |
||||
|
@Slf4j |
||||
|
public class ZookeeperNodeMonitor { |
||||
|
|
||||
|
@Autowired |
||||
|
private CuratorFramework curatorFramework; |
||||
|
|
||||
|
@Value("${zookeeper.publish-node}") |
||||
|
private String nodePath; |
||||
|
|
||||
|
@PostConstruct |
||||
|
public void init() { |
||||
|
try { |
||||
|
// 创建节点监听器 |
||||
|
NodeCache nodeCache = new NodeCache(curatorFramework, nodePath); |
||||
|
nodeCache.start(); |
||||
|
|
||||
|
// 监听节点变化 |
||||
|
nodeCache.getListenable().addListener(new NodeCacheListener() { |
||||
|
@Override |
||||
|
public void nodeChanged() throws Exception { |
||||
|
byte[] data = nodeCache.getCurrentData().getData(); |
||||
|
String nodeData = new String(data); |
||||
|
System.out.println("Node data changed: " + nodeData); |
||||
|
log.info("Node data changed: " + nodeData); |
||||
|
JSONObject zkData = JSONObject.parseObject(nodeData); |
||||
|
if(zkData.getString(Constant.OPERATION).equals(Constant.STOP)){ |
||||
|
log.info("触发停止,尝试关闭连接池"); |
||||
|
Integer scenes_id = zkData.getInteger("scenes_id"); |
||||
|
// 在这里处理节点数据变化的逻辑,比如通知其他组件或者执行相应的操作 |
||||
|
if (DynamicDataSource.targetDataSources.containsKey(scenes_id)) { |
||||
|
HikariDataSource dataSource = (HikariDataSource) DynamicDataSource.targetDataSources.get(scenes_id); |
||||
|
dataSource.close(); |
||||
|
DynamicDataSource.targetDataSources.remove(scenes_id); |
||||
|
log.info("场景:" + scenes_id + ",连接池关闭"); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}); |
||||
|
} catch (Exception e) { |
||||
|
e.printStackTrace(); |
||||
|
// 异常处理 |
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,37 @@ |
|||||
|
crawl: |
||||
|
kafka: |
||||
|
topic: produce_analyze |
||||
|
brokers: 172.18.1.146:9092,172.18.1.147:9092,172.18.1.148:9092 |
||||
|
server: |
||||
|
port: 9392 |
||||
|
#日志级别 |
||||
|
logging: |
||||
|
level: |
||||
|
com: |
||||
|
bfd: INFO |
||||
|
#日志路径 |
||||
|
log: |
||||
|
path: ./logs |
||||
|
spring: |
||||
|
boot: |
||||
|
admin: |
||||
|
client: |
||||
|
url: http://172.18.1.147:8001 |
||||
|
instance: |
||||
|
service-base-url: http://172.18.1.147:9392 |
||||
|
application: |
||||
|
name: mysql_app |
||||
|
management: |
||||
|
endpoints: |
||||
|
web: |
||||
|
exposure: |
||||
|
include: "*" |
||||
|
endpoint: |
||||
|
health: |
||||
|
show-details: always |
||||
|
health: |
||||
|
elasticsearch: |
||||
|
enabled: false |
||||
|
zookeeper: |
||||
|
connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181 |
||||
|
publish-node: /analyze |
@ -0,0 +1,38 @@ |
|||||
|
<configuration> |
||||
|
<!-- 属性文件:在properties文件中找到对应的配置项 --> |
||||
|
<springProperty scope="context" name="logging.path" source="logging.log.path"/> |
||||
|
<springProperty scope="context" name="logging.level" source="logging.level.com.bfd"/> |
||||
|
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 --> |
||||
|
<!-- <appender name="STDOUT" |
||||
|
class="ch.qos.logback.core.ConsoleAppender"> |
||||
|
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> |
||||
|
<Pattern>%d{HH:mm:ss.SSS} %-5level %logger{80} - %msg%n</Pattern> |
||||
|
</encoder> |
||||
|
</appender> --> |
||||
|
|
||||
|
<appender name="GLMAPPER-LOGGERONE" |
||||
|
class="ch.qos.logback.core.rolling.RollingFileAppender"> |
||||
|
<append>true</append> |
||||
|
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"> |
||||
|
<level>${logging.level}</level> |
||||
|
</filter> |
||||
|
<file> |
||||
|
${logging.path}/crawlSchedule.log |
||||
|
<!-- ${logging.path}/sendKafka.log --> |
||||
|
</file> |
||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> |
||||
|
<FileNamePattern>${logging.path}/crawlSchedule.log.%d{yyyy-MM-dd}</FileNamePattern> |
||||
|
<!-- <FileNamePattern>${logging.path}/sendKafka.log.%d{yyyy-MM-dd}</FileNamePattern> --> |
||||
|
<MaxHistory>7</MaxHistory> |
||||
|
</rollingPolicy> |
||||
|
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> |
||||
|
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern> |
||||
|
<charset>UTF-8</charset> |
||||
|
</encoder> |
||||
|
</appender> |
||||
|
|
||||
|
<root level="info"> |
||||
|
<appender-ref ref="GLMAPPER-LOGGERONE"/> |
||||
|
<!-- <appender-ref ref="STDOUT"/> --> |
||||
|
</root> |
||||
|
</configuration> |
Write
Preview
Loading…
Cancel
Save
Reference in new issue