commit 56309d44e01181628bd6779cd49081abe6707b97 Author: 55007 <55007@maojian> Date: Tue Jan 7 18:15:34 2025 +0800 mysql应用 diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..f7e4a1d --- /dev/null +++ b/.classpath @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..58da1f7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target/ +/logs/ diff --git a/.project b/.project new file mode 100644 index 0000000..92491e5 --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + mysql_app + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..abdea9a --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,4 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding/=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..71df522 --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,9 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.methodParameters=generate +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore +org.eclipse.jdt.core.compiler.release=disabled +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/README.md b/README.md new file mode 100644 index 0000000..c7de53a --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +mysql应用 java版本 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..64d2c41 --- /dev/null +++ b/pom.xml @@ -0,0 +1,173 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.2.4.RELEASE + + + com.bfd + data_process + 1.0-SNAPSHOT + + + 1.8 + + + + org.springframework.boot + spring-boot-starter-web + + + mysql + mysql-connector-java + runtime + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework + spring-jdbc + + + com.zaxxer + HikariCP + + + com.alibaba.fastjson2 + fastjson2 + 2.0.12 + + + cn.hutool + hutool-all + 5.8.27 + + + org.apache.kafka + kafka-clients + 2.7.1 + + + de.codecentric + spring-boot-admin-client + 2.2.4 + + + org.apache.curator + curator-framework + 5.2.0 + + + org.apache.curator + curator-recipes + 5.2.0 + + + + + + + + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + *.properties + *.yml + *.yaml + + + + + com.bfd.dataprocess.DataProcessApplication + + true + + lib/ + + false + + + + config/ + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy + package + + copy-dependencies + + + ${project.build.directory}/lib/ + + + + + + + maven-resources-plugin + + + copy-resources + package + + copy-resources + + + + + + src/main/resources/ + + *.properties + *.yml + *.yaml + + + + ${project.build.directory}/config + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/bfd/dataprocess/DataProcessApplication.java b/src/main/java/com/bfd/dataprocess/DataProcessApplication.java new file mode 100644 index 0000000..ee57b94 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/DataProcessApplication.java @@ -0,0 +1,23 @@ +package com.bfd.dataprocess; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; + +/** + * @author guowei + */ + +@SpringBootApplication +@Slf4j +public class DataProcessApplication { + public static void main(String[] args) { + ConfigurableApplicationContext applicationContext = SpringApplication.run(DataProcessApplication.class, args); + applicationContext.getBean(DataProcessApplication.class).start(); + } + + public void start(){ + + } +} diff --git a/src/main/java/com/bfd/dataprocess/config/Constant.java b/src/main/java/com/bfd/dataprocess/config/Constant.java new file mode 100644 index 0000000..056fd89 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/config/Constant.java @@ -0,0 +1,32 @@ +package com.bfd.dataprocess.config; + +/** + * @author:jinming + * @className:Constant + * @version:1.0 + * @description: + * @Date:2023/8/16 15:26 + */ +public class Constant { + /** + * 空字符串常量 + */ + public static final String EMPTY = ""; + + /** + * 不需要DataUtil解析的Key + */ + public static final String NOT_KEY = ":$"; + + public final static String STOP = "stop"; + + public final static String OR = "or"; + + public final static String SELECT = "select"; + + public final static String OPERATION = "operation"; + + public final static String CHAR = "`"; + + public final static String TRACE= "trace"; +} \ No newline at end of file diff --git a/src/main/java/com/bfd/dataprocess/controller/ApiController.java b/src/main/java/com/bfd/dataprocess/controller/ApiController.java new file mode 100644 index 0000000..0682239 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/controller/ApiController.java @@ -0,0 +1,64 @@ +package com.bfd.dataprocess.controller; + +import com.alibaba.fastjson2.JSONObject; +import com.bfd.dataprocess.service.ProcessService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; + +/** + * @author guowei + */ +@RestController +@Slf4j +@RequestMapping(value = "/api") +@CrossOrigin(origins = "*", maxAge = 3600) +public class ApiController { + + @Resource + ProcessService processService; + + /** + * insert Api + * @param jsonObject + * @return + */ + @RequestMapping(value = "/insertData", method = RequestMethod.POST, produces = "application/json") + @ResponseBody + public String insertData(@RequestBody JSONObject jsonObject) { + processService.insertData(jsonObject); + return "success"; + } + + /** + * query Api + * @param jsonObject + * @return + */ + @RequestMapping(value = "/queryData", method = RequestMethod.POST, produces = "application/json") + @ResponseBody + public String queryData(@RequestBody JSONObject jsonObject) { + processService.queryData(jsonObject); + return "success"; + } + + /** + * query field Api + * @param jsonObject + * @return + */ + @RequestMapping(value = "/queryField", method = RequestMethod.POST, produces = "application/json") + @ResponseBody + public String queryField(@RequestBody JSONObject jsonObject) { + processService.queryField(jsonObject); + return "success"; + } + + @RequestMapping(value = "/sqlExecution", method = RequestMethod.POST, produces = "application/json") + @ResponseBody + public String sqlImpl(@RequestBody JSONObject jsonObject) { + processService.sqlImpl(jsonObject); + return "success"; + } +} diff --git a/src/main/java/com/bfd/dataprocess/service/ProcessService.java b/src/main/java/com/bfd/dataprocess/service/ProcessService.java new file mode 100644 index 0000000..c8349da --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/service/ProcessService.java @@ -0,0 +1,19 @@ +package com.bfd.dataprocess.service; + +import com.alibaba.fastjson2.JSONObject; +import org.springframework.stereotype.Service; + +/** + * @author guowei + */ +@Service +public interface ProcessService { + + void insertData(JSONObject jsonObject); + + void queryData(JSONObject jsonObject); + + void queryField(JSONObject jsonObject); + + void sqlImpl(JSONObject jsonObject); +} diff --git a/src/main/java/com/bfd/dataprocess/service/impl/ProcessServiceImpl.java b/src/main/java/com/bfd/dataprocess/service/impl/ProcessServiceImpl.java new file mode 100644 index 0000000..fb65179 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/service/impl/ProcessServiceImpl.java @@ -0,0 +1,543 @@ +package com.bfd.dataprocess.service.impl; + +import cn.hutool.core.util.IdUtil; +import com.alibaba.fastjson2.*; +import com.bfd.dataprocess.config.Constant; +import com.bfd.dataprocess.service.ProcessService; +import com.bfd.dataprocess.util.DataUtil; +import com.bfd.dataprocess.util.DynamicDataSource; +import com.bfd.dataprocess.util.KfkUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.sql.*; +import java.util.*; + +/** + * @author guowei + */ +@Service +@Slf4j +public class ProcessServiceImpl implements ProcessService { + @Autowired + DynamicDataSource dynamicDataSource; + + @Override + public void insertData(JSONObject jsonObject) { + Map staMap = new HashMap<>(16); + JSONObject data = jsonObject.getJSONObject("data"); + //输入 + JSONObject input = jsonObject.getJSONObject("input"); + log.info("insertData ---> input:" + JSON.toJSONString(input)); + log.info("insertData ---> data:" + JSON.toJSONString(data)); + //数据库信息 + JSONObject conn_info = input.getJSONObject("conn_info"); + //场景id + Integer scenes_id = jsonObject.getInteger("scenes_id"); + //字段 + JSONArray form = input.getJSONArray("form"); + Map resultMap = new HashMap<>(16); + resultMap.put("id", IdUtil.randomUUID()); + try { + // 构建 INSERT 语句的 SQL 字符串 + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("INSERT INTO ").append(conn_info.getString("table")).append(" ("); + StringBuilder valuesBuilder = new StringBuilder(") VALUES ("); + List valueList = new ArrayList<>(); + form.forEach(value -> { + JSONObject item = JSONObject.parseObject(JSON.toJSONString(value)); + String fieldName = item.getString("field"); + String fieldValue = item.get("value").toString(); + if (fieldValue.isEmpty()) { + log.info("字段:" + fieldName + ",值为空,fieldValue:" + fieldValue); + return; + } + sqlBuilder.append(Constant.CHAR).append(fieldName).append(Constant.CHAR).append(", "); + String[] fieldValue_split = fieldValue.split(":"); + if (fieldValue_split.length > 1) { + JSONObject fieldData = data.getJSONObject(fieldValue_split[0]); + //.getString(fieldValue_split[1]); + fieldValue = JSONPath.eval(fieldData, fieldValue_split[1]).toString(); + } + valuesBuilder.append(" ? ").append(", "); + valueList.add(fieldValue); + }); + // 删除最后一个逗号和空格 + sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()); + valuesBuilder.delete(valuesBuilder.length() - 2, valuesBuilder.length()); + + sqlBuilder.append(valuesBuilder).append(")"); + String sql = sqlBuilder.toString(); +// System.out.println("Generated SQL: " + sql); + log.info("Generated SQL: " + sql); + // 创建 PreparedStatement 对象 + try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + for (int i = 0; i < valueList.size(); i++) { + // 根据字段的顺序设置参数值 + preparedStatement.setObject(i + 1, valueList.get(i).toString()); + } + +// System.out.println("Finall Generated INSERT SQL: " + preparedStatement.toString()); + log.info("Finall Generated INSERT SQL: " + preparedStatement.toString()); + // 执行插入操作 + int rowsInserted = preparedStatement.executeUpdate(); +// System.out.println("Rows inserted: " + rowsInserted); + log.info("Rows inserted: " + rowsInserted); + + if (rowsInserted > 0) { + resultMap.put("content", "写入成功"); + staMap.put("status",1); + staMap.put("message","成功"); + } else { + resultMap.put("content", "写入失败"); + staMap.put("status",2); + staMap.put("message","失败"); + } + } catch (SQLException sqlException) { + sqlException.printStackTrace(); + log.error("执行插入失败", sqlException); + resultMap.put("content", "失败"); + staMap.put("status",2); + staMap.put("message","失败,sql异常"); + } + + + } catch (Throwable e) { + log.error("insert error", e); + e.printStackTrace(); + resultMap.put("content", "写入失败"); + staMap.put("status",2); + staMap.put("message","失败"); + } + Map results = new HashMap<>(16); + resultMap.put("isLast",1); + results.put("results", JSON.toJSONString(resultMap)); + results.putAll(staMap); + jsonObject.put("result", results); + + KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); + log.info("推送成功,result:" + JSON.toJSONString(resultMap)); + + } + + @Async + @Override + public void queryData(JSONObject jsonObject) { +// PreparedStatement preparedStatement = null; +// Connection connection = null; + try { + Map dataMap = jsonObject.getObject("data", Map.class); + //输入 + JSONObject input = jsonObject.getJSONObject("input"); + //输出 + JSONObject output = jsonObject.getJSONObject("output"); + log.info("queryData ---> input:" + JSON.toJSONString(input)); + log.info("queryData ---> output:" + JSON.toJSONString(output)); + //数据库信息 + JSONObject conn_info = input.getJSONObject("conn_info"); + //查询条件 + JSONObject condition = input.getJSONObject("condition"); + //场景id + Integer scenes_id = jsonObject.getInteger("scenes_id"); + // 构建 SELECT 语句的 SQL 字符串 + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("SELECT "); + StringBuilder conditionBuilder = new StringBuilder(" WHERE 1=1 and "); + //查询字段 + for (Map.Entry entry : output.entrySet()) { +// System.out.println("Key: " + entry.getKey() + ", Value: " + entry.getValue()); + sqlBuilder.append(Constant.CHAR).append(entry.getKey()).append(Constant.CHAR).append(", "); + } + + // 删除最后一个逗号和空格 + sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()); + //拼接表名 + sqlBuilder.append(" FROM ").append(conn_info.getString("table")); + if (condition.containsKey(Constant.OR)) { + JSONArray or = condition.getJSONArray("or"); + or.forEach(value_or -> { + StringBuilder orBuilder = new StringBuilder(); + ArrayList item_or = (ArrayList) value_or; + item_or.forEach(value_and -> { + JSONObject item_and = JSONObject.parseObject(JSON.toJSONString(value_and)); + String value = item_and.getString("value"); + if (value.isEmpty()) { + return; + } + String dataForValue = (String) DataUtil.getValue(value, dataMap); +// JSONObject item_and = (JSONObject) value_and; + StringBuilder andBuilder = new StringBuilder() + //避免关键字,添加字符 + .append(Constant.CHAR) + .append(item_and.getString("field")) + .append(Constant.CHAR) + .append(item_and.getString("compare_symbol")) + .append("'" + dataForValue + "'") + .append(" and "); + orBuilder.append(andBuilder); + }); + if (orBuilder.length()==0){ + return; + } + //去掉最后一个and + orBuilder.delete(orBuilder.length() - 5, orBuilder.length()); + + //如果有or条件,再拼接括号 + if (or.size() > 1) { + orBuilder.insert(0, " ( "); + orBuilder.append(" )"); + } + + conditionBuilder.append(orBuilder).append(" or "); + + }); + //去掉最后一个or + + conditionBuilder.delete(conditionBuilder.length() - 4, conditionBuilder.length()); + + //组装SQL 条件 + sqlBuilder.append(conditionBuilder); + } + // 每批处理的行数 + int batchSize = 1000; + // 初始偏移量 + int offset = 0; + if (jsonObject.containsKey(Constant.TRACE) && jsonObject.getBoolean(Constant.TRACE)==true){ + log.info("测试流程,限制最多1条"); + sqlBuilder.append(" LIMIT 1"); + }else { + sqlBuilder.append(" LIMIT " + batchSize); + } + //获取数据库连接 + try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id)) { + int rowCount = 0; + while (true) { + sqlBuilder.append(" OFFSET " + offset); + String sql = sqlBuilder.toString(); +// System.out.println("Generated Query SQL: " + sql); + log.info("Generated Query SQL: " + sql); +// connection = dynamicDataSource.getConnection(conn_info); + // 创建 PreparedStatement 对象 +// preparedStatement = connection.prepareStatement(sql); + // 执行查询 + try (PreparedStatement preparedStatement = connection.prepareStatement(sql); + ResultSet resultSet = preparedStatement.executeQuery()) { + // 获取总行数 + resultSet.last(); + int totalRows = resultSet.getRow(); + resultSet.beforeFirst(); + int currentRow = 0; + // 遍历结果集并处理数据 + while (resultSet.next()) { + currentRow++; + Map rowMap = new HashMap<>(16); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnName(i); + Object columnValue = resultSet.getObject(i); + rowMap.put(columnName, columnValue); + } + // 判断是否是最后一条数据 + if (currentRow == totalRows) { + rowMap.put("isLast", 1); + } + Map results = new HashMap<>(16); + results.put("results", JSON.toJSONString(rowMap)); + results.put("status",1); + results.put("message","成功"); + jsonObject.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); + log.info("推送成功,result:" + JSON.toJSONString(rowMap)); + rowCount++; + } + + if (rowCount == 0) { + log.info("查询结果为空"); + Map rowMap = new HashMap<>(16); + for (Map.Entry entry : output.entrySet()) { + rowMap.put(entry.getKey(), ""); + } + rowMap.put("isLast",1); + Map results = new HashMap<>(16); + results.put("results", JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); + results.put("status",1); + results.put("message","成功"); + jsonObject.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(jsonObject, JSONWriter.Feature.WriteMapNullValue)); + log.info("推送成功,result:" + JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); + } + if (batchSize > resultSet.getRow()) { + break; + } + offset += batchSize; + } catch (Throwable e) { + e.printStackTrace(); + log.error("执行查询并推送失败", e); + break; + } + } + log.info("查询结果处理完成,rowCount:" + rowCount); + // 关闭连接和语句 +// resultSet.close(); + // 如果当前结果集小于批处理大小,则表示已经到达末尾 + // 增加偏移量,准备处理下一批数据 + } catch (Throwable e) { + e.printStackTrace(); + log.error("执行查询失败", e); + + } +// preparedStatement.close(); +// connection.close(); + } catch (Throwable e) { + log.error("查询失败", e); + e.printStackTrace(); + } + + + } + + @Override + public void queryField(JSONObject jsonObject) { + //输入 + JSONObject input = jsonObject.getJSONObject("input"); + log.info("queryField ---> input:" + JSON.toJSONString(input)); + //数据库信息 + JSONObject conn_info = input.getJSONObject("conn_info"); + //场景id + Integer scenes_id = jsonObject.getInteger("scenes_id"); + //构建查询语句 + StringBuilder sqlBuilder = new StringBuilder(" SHOW COLUMNS FROM ").append(conn_info.getString("table")); + String sql = sqlBuilder.toString(); +// System.out.println("Generated QueryField SQL: " + sql); + log.info("Generated QueryField SQL: " + sql); + try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + ResultSet resultSet = preparedStatement.executeQuery()) { + Map fieldMap = new LinkedHashMap(); + while (resultSet.next()) { + String columnName = resultSet.getString("Field"); +// System.out.println(columnName); + fieldMap.put(columnName, columnName); + } + Map results = new HashMap<>(16); + fieldMap.put("isLast",1); + results.put("results", JSON.toJSONString(fieldMap)); + results.put("status",1); + results.put("message","成功"); + + jsonObject.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); + log.info("推送成功,result:" + JSON.toJSONString(fieldMap)); + + } catch (Throwable e) { + e.printStackTrace(); + log.error("查询表字段失败", e); + } + + } + + @Override + public void sqlImpl(JSONObject jsonObject) { + //输入 + JSONObject input = jsonObject.getJSONObject("input"); + //输出 + JSONObject output = jsonObject.getJSONObject("output"); + + log.info("queryData ---> input:" + JSON.toJSONString(input)); + log.info("queryData ---> output:" + JSON.toJSONString(output)); + + //数据库信息 + JSONObject conn_info = input.getJSONObject("conn_info"); + //场景id + Integer scenes_id = jsonObject.getInteger("scenes_id"); + + JSONObject data = jsonObject.getJSONObject("data"); + log.info("queryData ---> data:" + JSON.toJSONString(data)); + //执行sql拼接 + JSONArray prompt = input.getJSONArray("prompt"); + StringBuilder sqlBuilder = new StringBuilder(); + prompt.forEach(value -> { +// JSONObject item = (JSONObject) value; + JSONObject item = JSONObject.parseObject(JSON.toJSONString(value)); + //type=1文本内容 + int two = 2; + //type=2变量值 + Integer type = item.getInteger("type"); + if (type == 1) { + sqlBuilder.append(item.getString("value")); + } else if (type == two) { + String[] fieldValue_split = item.getString("value").split(":"); + JSONObject fieldData = data.getJSONObject(fieldValue_split[0]); + sqlBuilder.append(JSONPath.eval(fieldData, fieldValue_split[1])); + } + }); + String sql = sqlBuilder.toString(); +// System.out.println("Generated sqlImpl SQL: " + sql); + log.info("Generated sqlImpl SQL: " + sql); + //查询语句 + Set strings = output.keySet(); + if (sql.toLowerCase().contains(Constant.SELECT)) { + if (jsonObject.containsKey(Constant.TRACE) && jsonObject.getBoolean(Constant.TRACE)==true){ + log.info("测试流程,限制最多1条"); + sqlBuilder.append(" LIMIT 1"); + } + try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + ResultSet resultSet = preparedStatement.executeQuery()) { + int fieldType = input.getInteger("fieldType"); + if (fieldType == 0) { + List dataMapList = new ArrayList<>(); + while (resultSet.next()) { + Map dataMap = new HashMap(16); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnLabel(i); + Object columnValue = resultSet.getObject(i); + dataMap.put(columnName, columnValue); + } + dataMapList.add(dataMap); + } + Map contentMap = new HashMap<>(16); + contentMap.put("content", dataMapList); + contentMap.put("isLast",1); + Map results = new HashMap<>(16); + results.put("results", JSON.toJSONString(contentMap)); + results.put("status",1); + results.put("message","成功"); + jsonObject.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); + log.info("推送成功,result:" + JSON.toJSONString(contentMap)); + return; + } + int rowCount = 0; + // 获取总行数 + resultSet.last(); + int totalRows = resultSet.getRow(); + resultSet.beforeFirst(); + int currentRow = 0; + while (resultSet.next()) { + currentRow++; + Map rowMap = new HashMap<>(16); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnLabel(i); + Object columnValue = resultSet.getObject(i); + log.info("列名:"+columnName+",value:"+columnValue); + if (strings.contains(columnName)) { + rowMap.put(columnName, columnValue); + } + } + // 判断是否是最后一条数据 + if (currentRow == totalRows) { + rowMap.put("isLast", 1); + } + rowMap.put("is_diffusion",true); + Map results = new HashMap<>(16); + results.put("results", JSON.toJSONString(rowMap)); + results.put("status",1); + results.put("message","成功"); + jsonObject.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); + log.info("推送成功,result:" + JSON.toJSONString(rowMap)); + rowCount++; + } + if (rowCount == 0) { + log.info("查询结果为空"); + Map rowMap = new HashMap<>(16); + for (Map.Entry entry : output.entrySet()) { + rowMap.put(entry.getKey(), ""); + } + rowMap.put("isLast", 1); + Map results = new HashMap<>(16); + results.put("results", JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); + results.put("status",1); + results.put("message","成功"); + jsonObject.put("result", results); + KfkUtil.sendKafka(JSONObject.toJSONString(jsonObject, JSONWriter.Feature.WriteMapNullValue)); + log.info("推送成功,result:" + JSON.toJSONString(rowMap, JSONWriter.Feature.WriteMapNullValue)); + } + + } catch (Throwable e) { + e.printStackTrace(); + log.error("执行失败", e); + } + } else { + int rowsInserted = 0; + try (Connection connection = dynamicDataSource.getConnection(conn_info, scenes_id); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + rowsInserted = preparedStatement.executeUpdate(); + + log.info("执行成功,result:" + rowsInserted); + + Map results = new HashMap<>(16); + Map contentMap = new HashMap<>(16); + contentMap.put("content", rowsInserted); + contentMap.put("isLast", 1); + results.put("results", JSON.toJSONString(contentMap)); + results.put("status",1); + results.put("message","成功"); + jsonObject.put("result", results); + KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); + log.info("推送成功,result:" + JSON.toJSONString(contentMap)); + } catch (Throwable e) { + e.printStackTrace(); + log.error("执行失败", e); + } + } + } + + public static void main(String[] args) { +// String formJson = "{\"form\": [" + +// "{\"field\": \"subject_id\",\"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f0']\"}," + +// "{\"field\": \"id\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f1']\"}," + +// "{\"field\": \"name\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f2']\"}," + +// "{\"field\": \"name_en\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f2']\"}," + +// "{\"field\": \"publication_num\", \"dataType\": \"integer\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f3']\"}," + +// "{\"field\": \"popular_paper_rate\", \"dataType\": \"double\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f5']\"}," + +// "{\"field\": \"top_paper_rate\", \"dataType\": \"double\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f6']\"}," + +// "{\"field\": \"h_index\", \"dataType\": \"double\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f7']\"}," + +// "{\"field\": \"affiliation\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f8']\"}," + +// "{\"field\": \"nationality\", \"dataType\": \"keyword\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['f9']\"}," + +// "{\"field\": \"citation_num\", \"dataType\": \"integer\",\"value\": \"5013b99b-61a9-49dd-a571-66516f86f5d9:$['10']\"}" + +// "]}"; +// String formDataJson = "{\"5013b99b-61a9-49dd-a571-66516f86f5d9\": {\"f0\": \"subject_id_value\", \"f1\": \"id_value\", " + +// "\"f2\": \"name_value\", \"f3\": 123, \"f5\": 12.34, \"f6\": 56.78, \"f7\": 9.0, \"f8\": \"affiliation_value\", " + +// "\"f9\": \"nationality_value\", \"10\": 456}}"; +// JSONObject data = JSONObject.parseObject(formDataJson); +// JSONObject fromdata = JSONObject.parseObject(formJson); +// JSONArray from = fromdata.getJSONArray("form"); +// +// StringBuilder sqlBuilder = new StringBuilder(); +// sqlBuilder.append("INSERT INTO ").append("sssss").append(" ("); +// StringBuilder valuesBuilder = new StringBuilder(") VALUES ("); +// +// from.forEach(value -> { +// JSONObject item = (JSONObject) value; +// String fieldName = item.getString("field"); +// String fieldValue = item.getString("value"); +// String[] fieldValue_split = fieldValue.split(":"); +// JSONObject fieldData = data.getJSONObject(fieldValue_split[0]); +// //.getString(fieldValue_split[1]); +// fieldValue = JSONPath.eval(fieldData, fieldValue_split[1]).toString(); +// sqlBuilder.append(fieldName).append(", "); +// valuesBuilder.append(fieldValue).append(", "); +// }); +// // 删除最后一个逗号和空格 +// sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()); +// valuesBuilder.delete(valuesBuilder.length() - 2, valuesBuilder.length()); +// +// sqlBuilder.append(valuesBuilder).append(")"); +// String sql = sqlBuilder.toString(); +// System.out.println("Generated SQL: " + sql); + String fieldValue = "2"; + String[] fieldValue_split = fieldValue.split(":"); + System.out.println(fieldValue_split.length); + + } +} diff --git a/src/main/java/com/bfd/dataprocess/util/AsyncConfig.java b/src/main/java/com/bfd/dataprocess/util/AsyncConfig.java new file mode 100644 index 0000000..8e331e1 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/util/AsyncConfig.java @@ -0,0 +1,37 @@ +package com.bfd.dataprocess.util; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + + +@Configuration +@EnableAsync //Java配置文件标注它,那么Spring就会开启异步可用 +/** + * @author guowei + * 异步任务线程池 + * 注解@EnableAsync代表开启Spring异步。这样就可以使用@Async驱动Spring使用异步, + * 但是异步需要提供可用线程池,所以这里的配置类还会实现AsyncConfigurer接口,然后覆盖getAsyncExecutor方法,这样就可以自定义一个线程池 + */ +public class AsyncConfig implements AsyncConfigurer { + + @Override + public Executor getAsyncExecutor() { + //定义线程池 + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); + //核心线程数 + threadPoolTaskExecutor.setCorePoolSize(10); + //线程池最大线程数 + threadPoolTaskExecutor.setMaxPoolSize(50); + //线程队列最大线程数 + threadPoolTaskExecutor.setQueueCapacity(200); + //初始化 + threadPoolTaskExecutor.initialize(); + + return threadPoolTaskExecutor; + } + +} diff --git a/src/main/java/com/bfd/dataprocess/util/DataUtil.java b/src/main/java/com/bfd/dataprocess/util/DataUtil.java new file mode 100644 index 0000000..3eb0c32 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/util/DataUtil.java @@ -0,0 +1,65 @@ +package com.bfd.dataprocess.util; + +import java.util.Map; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.JSONPath; +import com.bfd.dataprocess.config.Constant; +import lombok.extern.slf4j.Slf4j; + + +/** + * @author:jinming + * @className:DataUtil + * @version:1.0 + * @description: 获取dataValue的值 + * @Date:2023/11/1 9:54 + */ +@Slf4j +public class DataUtil { + /** + * @param key 传入的key + * @param dataMap 数据map + * @return 根据传入的参数进行判断解析,返回正确的dataValue + */ + public static Object getValue(String key, Map dataMap) { + try { + //公式为空直接就返回 + if (key.equals(Constant.EMPTY)) { + return Constant.EMPTY; + } + if (!key.contains(Constant.NOT_KEY)) { + return key; + } + Object dataValue; + String isJson = "#json#"; + if (key.contains(isJson)) { + //进行第一次拆分,获取#json#前面的部分 + String[] keySplit = key.split(isJson); + String firstDataKey = keySplit[0]; + String[] firstDataKeySplit = firstDataKey.split(":"); + //取出前半部分对应的JSON数据并转换为JSONObject + String dataJson = (String) dataMap.get(firstDataKeySplit[0]); + JSONObject dataJsonObject = JSON.parseObject(dataJson); + //根据key的后半部分取出对应JSONObject中的值 + String firstDataKeyJson = (String) JSONPath.eval(dataJsonObject, firstDataKeySplit[1]); + String secDataKey = keySplit[1]; + JSONObject firstDataJsonObject = JSON.parseObject(firstDataKeyJson); + dataValue = JSONPath.eval(firstDataJsonObject, secDataKey); + return dataValue; + } + String[] keySplit = key.split(":"); + String jsonPath = keySplit[1]; + String dataJson = (String) dataMap.get(keySplit[0]); + JSONObject dataJsonObject = JSON.parseObject(dataJson); + dataValue = JSONPath.eval(dataJsonObject, jsonPath); + return dataValue; + } catch (Exception e) { + // TODO: handle exception + log.error("jsonpath公式取值异常,", e); + return null; + } + + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/dataprocess/util/DynamicDataSource.java b/src/main/java/com/bfd/dataprocess/util/DynamicDataSource.java new file mode 100644 index 0000000..aeca486 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/util/DynamicDataSource.java @@ -0,0 +1,105 @@ +package com.bfd.dataprocess.util; + +import com.alibaba.fastjson2.JSONObject; +import com.zaxxer.hikari.HikariDataSource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; +import org.springframework.stereotype.Component; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +/** + * @author guowei + */ +@Component +@Slf4j +public class DynamicDataSource extends AbstractRoutingDataSource { + + public static Map targetDataSources = new HashMap<>(); + + @Override + protected Object determineCurrentLookupKey() { + // 这里可以根据具体需求来确定使用哪个数据源,例如根据请求参数等 + // 默认数据源的标识,可以根据需要修改 + return "default"; + } + + @Override + public void afterPropertiesSet() { + setTargetDataSources(targetDataSources); + // 设置默认数据源 + setDefaultTargetDataSource(targetDataSources.get("default")); + super.afterPropertiesSet(); + } + + public void addTargetDataSource(Integer key, DataSource dataSource) { + targetDataSources.put(key, dataSource); + setTargetDataSources(targetDataSources); + afterPropertiesSet(); + } + + public Connection getConnection(JSONObject connInfo,int scenes_id) throws SQLException { + //2024.6.28 有需求链接海外数据库,添加时区 + String dbUrl = "jdbc:mysql://" + connInfo.get("hostname") + ":" + connInfo.get("port") + "/" + connInfo.get("database")+"?serverTimezone=Asia/Shanghai"; + DataSource dataSource = null; + synchronized (targetDataSources) { + if (!targetDataSources.containsKey(scenes_id)) { + log.info("数据源为空或失效,创建新的数据源,dbUrl:" + dbUrl); + // 创建新的数据源,并添加到 targetDataSources 中 + String dbUsername = connInfo.getString("username"); + String dbPassword = connInfo.getString("password"); + dataSource = createDataSource(dbUrl, dbUsername, dbPassword); + addTargetDataSource(scenes_id, dataSource); + } else { + dataSource = (DataSource) targetDataSources.get(scenes_id); + } + } + return dataSource.getConnection(); + } + + private boolean isValid(DataSource dataSource) { + try (Connection connection = dataSource.getConnection()) { + // 通过尝试获取连接来检查连接池是否有效 + return true; + } catch (SQLException e) { + // 连接失败,说明连接池已失效 + return false; + } + } + + /** + * 创建新的数据源方法,这里需要根据具体情况实现 + */ + private DataSource createDataSource(String dbUrl, String dbUsername, String dbPassword) { +// // 这里可以使用 Spring Boot 的 DataSourceBuilder 来创建数据源 +// return DataSourceBuilder +// .create() +// .url(dbUrl) +// .username(dbUsername) +// .password(dbPassword) +// .build(); + HikariDataSource dataSource = new HikariDataSource(); + dataSource.setJdbcUrl(dbUrl); + dataSource.setUsername(dbUsername); + dataSource.setPassword(dbPassword); + dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); + // 设置连接池大小 + dataSource.setMaximumPoolSize(5); + dataSource.setConnectionTestQuery("SELECT 1"); + // 设置连接池中连接的过期时间 + // 10分钟 + dataSource.setIdleTimeout(600000); + // 30分钟 + dataSource.setMaxLifetime(1800000); + // 30秒 + dataSource.setConnectionTimeout(30000); + // 设置连接泄露检测阈值为 1 分钟 + dataSource.setLeakDetectionThreshold(60000); + return dataSource; + } +} + diff --git a/src/main/java/com/bfd/dataprocess/util/KfkUtil.java b/src/main/java/com/bfd/dataprocess/util/KfkUtil.java new file mode 100644 index 0000000..8b2aacb --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/util/KfkUtil.java @@ -0,0 +1,81 @@ +package com.bfd.dataprocess.util; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.Properties; + +/** + * @author guowei + * kfk工具类 + */ +@Component +@Slf4j +public class KfkUtil { + private static String topic; + + private static String brokerList; + + @Value("${crawl.kafka.topic}") + public void setTopic(String topic) { + KfkUtil.topic = topic; + } + + @Value("${crawl.kafka.brokers}") + public void setBrokerList(String brokerList) { + KfkUtil.brokerList = brokerList; + } + private static KafkaProducer kafkaProducer; + + public static int num = 0; + + /** + * 获取KafkaProducer实例 + */ + public static KafkaProducer getProducer() { + if (kafkaProducer == null) { + Properties props = new Properties(); + //xxx服务器ip + props.put("bootstrap.servers", brokerList); + //所有follower都响应了才认为消息提交成功,即"committed" + props.put("acks", "all"); + //retries = MAX 无限重试,直到你意识到出现了问题:) + props.put("retries", 3); + //producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 + props.put("batch.size", 16384); + //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms + //延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 + props.put("linger.ms", 1); + //producer可以用来缓存数据的内存大小。 + props.put("buffer.memory", 33554432); + props.put("key.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + kafkaProducer = new KafkaProducer(props); + } + return kafkaProducer; + } + + /** + * 关闭KafkaProducer实例 + */ + public static void closeProducer() { + if (kafkaProducer != null) { + log.info("----------close producer----------"); + kafkaProducer.close(); + kafkaProducer = null; + } + } + + public static void sendKafka(String resultData) { + KafkaProducer producer = getProducer(); + ProducerRecord se = new ProducerRecord(topic, resultData); + producer.send(se); + log.info("发送kafka成功"); +// num++; + } +} diff --git a/src/main/java/com/bfd/dataprocess/util/ZookeeperConfig.java b/src/main/java/com/bfd/dataprocess/util/ZookeeperConfig.java new file mode 100644 index 0000000..37ba8d2 --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/util/ZookeeperConfig.java @@ -0,0 +1,25 @@ +package com.bfd.dataprocess.util; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author jian.mao + * @date 2024年4月16日 + * @description + */ +@Configuration +public class ZookeeperConfig { + @Value("${zookeeper.connection-string}") + private String connectionString; + @Bean + public CuratorFramework curatorFramework() { + CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3)); + curatorFramework.start(); + return curatorFramework; + } +} diff --git a/src/main/java/com/bfd/dataprocess/util/ZookeeperNodeMonitor.java b/src/main/java/com/bfd/dataprocess/util/ZookeeperNodeMonitor.java new file mode 100644 index 0000000..99708dc --- /dev/null +++ b/src/main/java/com/bfd/dataprocess/util/ZookeeperNodeMonitor.java @@ -0,0 +1,65 @@ +package com.bfd.dataprocess.util; + +import com.alibaba.fastjson2.JSONObject; +import com.bfd.dataprocess.config.Constant; +import com.zaxxer.hikari.HikariDataSource; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * @author jian.mao + * @date 2024年4月17日 + * @description + */ +@Component +@Slf4j +public class ZookeeperNodeMonitor { + + @Autowired + private CuratorFramework curatorFramework; + + @Value("${zookeeper.publish-node}") + private String nodePath; + + @PostConstruct + public void init() { + try { + // 创建节点监听器 + NodeCache nodeCache = new NodeCache(curatorFramework, nodePath); + nodeCache.start(); + + // 监听节点变化 + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + byte[] data = nodeCache.getCurrentData().getData(); + String nodeData = new String(data); + System.out.println("Node data changed: " + nodeData); + log.info("Node data changed: " + nodeData); + JSONObject zkData = JSONObject.parseObject(nodeData); + if(zkData.getString(Constant.OPERATION).equals(Constant.STOP)){ + log.info("触发停止,尝试关闭连接池"); + Integer scenes_id = zkData.getInteger("scenes_id"); + // 在这里处理节点数据变化的逻辑,比如通知其他组件或者执行相应的操作 + if (DynamicDataSource.targetDataSources.containsKey(scenes_id)) { + HikariDataSource dataSource = (HikariDataSource) DynamicDataSource.targetDataSources.get(scenes_id); + dataSource.close(); + DynamicDataSource.targetDataSources.remove(scenes_id); + log.info("场景:" + scenes_id + ",连接池关闭"); + } + } + } + }); + } catch (Exception e) { + e.printStackTrace(); + // 异常处理 + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..acf370f --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,37 @@ +crawl: + kafka: + topic: produce_analyze + brokers: 172.18.1.146:9092,172.18.1.147:9092,172.18.1.148:9092 +server: + port: 9392 +#日志级别 +logging: + level: + com: + bfd: INFO + #日志路径 + log: + path: ./logs +spring: + boot: + admin: + client: + url: http://172.18.1.147:8001 + instance: + service-base-url: http://172.18.1.147:9392 + application: + name: mysql_app +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: always + health: + elasticsearch: + enabled: false +zookeeper: + connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181 + publish-node: /analyze \ No newline at end of file diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..0c59240 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,38 @@ + + + + + + + + + true + + ${logging.level} + + + ${logging.path}/crawlSchedule.log + + + + ${logging.path}/crawlSchedule.log.%d{yyyy-MM-dd} + + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + UTF-8 + + + + + + + +