commit 73cc60e5bc04e2410af85f75dbf8c25b41be0480 Author: 55007 <55007@maojian> Date: Tue Jan 7 17:15:27 2025 +0800 指标计算应用 diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..f7e4a1d --- /dev/null +++ b/.classpath @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000..f5be49a --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + datacalculate + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..839d647 --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding//src/test/java=UTF-8 +encoding/=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..2f5cc74 --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,8 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore +org.eclipse.jdt.core.compiler.release=disabled +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/README.md b/README.md new file mode 100644 index 0000000..26900f5 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +指标计算 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..604ccfe --- /dev/null +++ b/pom.xml @@ -0,0 +1,165 @@ + + + 4.0.0 + com.bfd + datacalculate + 0.0.1-SNAPSHOT + datacalculate + datacalculate + + 1.8 + UTF-8 + UTF-8 + 2.2.4.RELEASE + + + + org.springframework.boot + spring-boot-starter-web + + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.apache.commons + commons-math3 + 3.6.1 + + + com.alibaba.fastjson2 + fastjson2 + 2.0.12 + + + cn.hutool + hutool-all + 5.8.27 + + + org.apache.kafka + kafka-clients + 2.7.1 + + + com.squareup.okhttp3 + okhttp + 3.11.0 + + + org.thymeleaf + thymeleaf + 3.0.12.RELEASE + + + de.codecentric + spring-boot-admin-client + 2.2.4 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + *.properties + *.yml + *.yaml + + + + + com.bfd.datacalculate.DataCalculateApplication + + true + + lib/ + + false + + + + config/ + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy + package + + copy-dependencies + + + ${project.build.directory}/lib/ + + + + + + + maven-resources-plugin + + + copy-resources + package + + copy-resources + + + + + + src/main/resources/ + + *.properties + *.yml + *.yaml + + + + ${project.build.directory}/config + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + diff --git a/src/main/java/com/bfd/datacalculate/DataCalculateApplication.java b/src/main/java/com/bfd/datacalculate/DataCalculateApplication.java new file mode 100644 index 0000000..8846b3e --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/DataCalculateApplication.java @@ -0,0 +1,15 @@ +package com.bfd.datacalculate; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication +@EnableScheduling +public class DataCalculateApplication { + + public static void main(String[] args) { + SpringApplication.run(DataCalculateApplication.class, args); + } + +} diff --git a/src/main/java/com/bfd/datacalculate/config/GlobalConfig.java b/src/main/java/com/bfd/datacalculate/config/GlobalConfig.java new file mode 100644 index 0000000..8c041c3 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/config/GlobalConfig.java @@ -0,0 +1,49 @@ +package com.bfd.datacalculate.config; + +import com.alibaba.fastjson2.JSONObject; +import com.bfd.datacalculate.model.DataEntity; +import com.bfd.datacalculate.model.DataGroup; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author guowei + */ +@Component +public class GlobalConfig { + /** 全局锁 **/ + public static final Object lock = new Object(); + + public static final long INACTIVITY_THRESHOLD = 60_000; // 1分钟的毫秒数 + + public static final Map dataGroups = new ConcurrentHashMap<>(); + + public static final Map originalParameters = new ConcurrentHashMap<>(); + + // function 映射 + public static final Map FUNCTION_MAP; + static { + Map functionMap = new HashMap<>(); + functionMap.put(1, "求和"); + functionMap.put(2, "平均值"); + functionMap.put(3, "最大值"); + functionMap.put(4, "最小值"); + functionMap.put(5, "加权求和"); + FUNCTION_MAP = Collections.unmodifiableMap(functionMap); // 不可变的映射 + } + + // exception_handle 映射 + public static final Map EXCEPTION_HANDLE_MAP; + static { + Map exceptionHandleMap = new HashMap<>(); + exceptionHandleMap.put(1, "忽略异常值"); + exceptionHandleMap.put(2, "用均值代替"); + EXCEPTION_HANDLE_MAP = Collections.unmodifiableMap(exceptionHandleMap); // 不可变的映射 + } + + public static final String UNDERLINE = "_"; +} diff --git a/src/main/java/com/bfd/datacalculate/controller/ApiController.java b/src/main/java/com/bfd/datacalculate/controller/ApiController.java new file mode 100644 index 0000000..9adca77 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/controller/ApiController.java @@ -0,0 +1,39 @@ +package com.bfd.datacalculate.controller; + +import com.alibaba.fastjson2.JSONObject; +import com.bfd.datacalculate.config.GlobalConfig; +import com.bfd.datacalculate.service.DataMergeService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; + +/** + * @author guowei + */ +@RestController +@Slf4j +@RequestMapping(value = "/data") +@CrossOrigin(origins = "*", maxAge = 3600) +public class ApiController { + @Resource + DataMergeService dataMergeService; + /** + * 文档转换 Api + * @param jsonObject + * @return + */ + @RequestMapping(value = "/calculate", method = RequestMethod.POST, produces = "application/json") + @ResponseBody + public String varAna(@RequestBody JSONObject jsonObject) { + log.info("数据合并计算参数:"+jsonObject); + +// JSONObject data = jsonObject.getJSONObject("data"); +// String businessKey = data.getString("businessKey"); + + synchronized(GlobalConfig.lock){ + dataMergeService.merge(jsonObject); + } + return "success"; + } +} diff --git a/src/main/java/com/bfd/datacalculate/model/DataEntity.java b/src/main/java/com/bfd/datacalculate/model/DataEntity.java new file mode 100644 index 0000000..c10f703 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/model/DataEntity.java @@ -0,0 +1,18 @@ +package com.bfd.datacalculate.model; + +import com.bfd.datacalculate.service.CalculationStrategy; +import com.bfd.datacalculate.service.impl.*; +import lombok.Data; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +/** + * @author guowei + */ +@Data +public class DataEntity { + private double value; + private Integer type; // 计算类型 +} diff --git a/src/main/java/com/bfd/datacalculate/model/DataGroup.java b/src/main/java/com/bfd/datacalculate/model/DataGroup.java new file mode 100644 index 0000000..5cf4e50 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/model/DataGroup.java @@ -0,0 +1,51 @@ +package com.bfd.datacalculate.model; + +import com.bfd.datacalculate.service.CalculationStrategy; +import com.bfd.datacalculate.service.impl.*; +import lombok.Data; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +/** + * @author guowei + */ +@Data +public class DataGroup { + private List dataList = new ArrayList<>(); + private Instant lastUpdateTime = Instant.now(); + private List weights = new ArrayList<>(); // 仅用于加权求和 + + // 根据 type 选择计算策略 + public CalculationStrategy getStrategy() { + if (dataList.isEmpty()) { + throw new IllegalArgumentException("Data list is empty"); + } + + Integer type = dataList.get(0).getType(); + CalculationStrategy strategy; + + switch (type) { + case 1: + strategy = new SumStrategy(); + break; + case 2: + strategy = new AverageStrategy(); + break; + case 3: + strategy = new MaxStrategy(); + break; + case 4: + strategy = new MinStrategy(); + break; + case 5: + strategy = new WeightedSumStrategy(weights); + break; + default: + throw new IllegalArgumentException("Unknown calculation type"); + } + + return strategy; + } +} diff --git a/src/main/java/com/bfd/datacalculate/service/CalculationStrategy.java b/src/main/java/com/bfd/datacalculate/service/CalculationStrategy.java new file mode 100644 index 0000000..6a9391b --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/CalculationStrategy.java @@ -0,0 +1,11 @@ +package com.bfd.datacalculate.service; + +import java.util.List; + +/** + * @author guowei + */ +public interface CalculationStrategy { + double calculate(List data); + +} diff --git a/src/main/java/com/bfd/datacalculate/service/DataMergeService.java b/src/main/java/com/bfd/datacalculate/service/DataMergeService.java new file mode 100644 index 0000000..d062ffb --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/DataMergeService.java @@ -0,0 +1,14 @@ +package com.bfd.datacalculate.service; + +import com.alibaba.fastjson2.JSONObject; +import org.springframework.stereotype.Service; + +/** + * @author guowei + * 数据合并处理 + */ +@Service +public interface DataMergeService { + + void merge(JSONObject jsonObject); +} diff --git a/src/main/java/com/bfd/datacalculate/service/DataProcessService.java b/src/main/java/com/bfd/datacalculate/service/DataProcessService.java new file mode 100644 index 0000000..0cafd98 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/DataProcessService.java @@ -0,0 +1,84 @@ +package com.bfd.datacalculate.service; + +import cn.hutool.core.util.IdUtil; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.bfd.datacalculate.config.GlobalConfig; +import com.bfd.datacalculate.model.DataEntity; +import com.bfd.datacalculate.model.DataGroup; +import com.bfd.datacalculate.utils.KfkUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.util.*; + +/** + * @author guowei + */ +@Service +@Slf4j +public class DataProcessService { + + @Scheduled(fixedRate = 60_000) // 每分钟执行一次 + public void scheduledProcessing() { + log.info("开始执行计算,有 {} 个数据组等待计算",GlobalConfig.dataGroups.isEmpty()? 0 : GlobalConfig.dataGroups.size()); + if (GlobalConfig.dataGroups.isEmpty()) { + // 如果为空,则无需进行任何处理 + System.out.println("No data to process."); + return; + } + Instant now = Instant.now(); + Iterator> iterator = GlobalConfig.dataGroups.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + DataGroup group = entry.getValue(); + + synchronized (group) { + if (now.minusMillis(GlobalConfig.INACTIVITY_THRESHOLD).isAfter(group.getLastUpdateTime())) { + log.info("数据组ID->{}:到达计算时间限制{},开始计算",entry.getKey(),GlobalConfig.INACTIVITY_THRESHOLD); + processData(entry.getKey(), group); + iterator.remove(); // 使用迭代器安全地移除元素 + + } + } + } + } + + private void processData(String key, DataGroup group) { + Map result = new HashMap<>(16); + result.put("status",1); + result.put("message","成功"); + Map results = new HashMap<>(16); + results.put("isLast",1); + results.put("id", IdUtil.randomUUID()); + Double calculate = 0.0; + try { + // 处理数据的逻辑 + CalculationStrategy strategy = group.getStrategy(); + List doubles = new ArrayList<>(); + List dataList = group.getDataList(); + for (DataEntity data : dataList) { + doubles.add(data.getValue()); + } + calculate = strategy.calculate(doubles); + log.info("计算{},数组为{},结果={}", GlobalConfig.FUNCTION_MAP.get(dataList.get(0).getType()), JSON.toJSONString(doubles), calculate); + System.out.println(calculate); + }catch (Exception e){ + e.printStackTrace(); + log.error("计算数据异常",e); + result.put("status",2); + result.put("message","失败"); + } + results.put("content",calculate.toString()); + result.put("results",JSON.toJSONString(results)); + JSONObject jsonObject = GlobalConfig.originalParameters.get(key); + jsonObject.put("result",result); + System.out.println(JSON.toJSONString(jsonObject)); + KfkUtil.sendKafka(JSON.toJSONString(jsonObject)); + log.info("发送kfk成功,result:{}",JSON.toJSONString(result)); + GlobalConfig.originalParameters.remove(key); + } +} diff --git a/src/main/java/com/bfd/datacalculate/service/impl/AverageStrategy.java b/src/main/java/com/bfd/datacalculate/service/impl/AverageStrategy.java new file mode 100644 index 0000000..ce8266a --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/impl/AverageStrategy.java @@ -0,0 +1,22 @@ +package com.bfd.datacalculate.service.impl; + +import com.bfd.datacalculate.service.CalculationStrategy; + +import java.util.List; + +/** + * @author guowei + */ +public class AverageStrategy implements CalculationStrategy { + + @Override + public double calculate(List data) { + return data.stream() + .filter(d -> !Double.isNaN(d)) // 过滤掉 NaN 值 + .mapToDouble(Double::doubleValue) + .average() + .orElse(0.0); // 如果过滤后没有数据,返回 0.0 + } + + +} diff --git a/src/main/java/com/bfd/datacalculate/service/impl/DataMergeServiceImpl.java b/src/main/java/com/bfd/datacalculate/service/impl/DataMergeServiceImpl.java new file mode 100644 index 0000000..1fd1fec --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/impl/DataMergeServiceImpl.java @@ -0,0 +1,134 @@ +package com.bfd.datacalculate.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.bfd.datacalculate.config.GlobalConfig; +import com.bfd.datacalculate.model.DataEntity; +import com.bfd.datacalculate.model.DataGroup; +import com.bfd.datacalculate.service.DataMergeService; +import com.bfd.datacalculate.utils.Utils; +import jdk.nashorn.internal.objects.Global; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.Instant; + +/** + * @author guowei + */ +@Service +@Slf4j +public class DataMergeServiceImpl implements DataMergeService { + /** + * 合并多条数据 + * @param jsonObject + */ + @Override + public void merge(JSONObject jsonObject) { + JSONObject data = jsonObject.getJSONObject("data"); + String businessKey = data.getString("businessKey"); + //输入 + JSONObject input = jsonObject.getJSONObject("input"); + log.info("合并计算 --> " + businessKey + ";data -->" + jsonObject); + Integer type = input.getInteger("type"); + GlobalConfig.originalParameters.putIfAbsent(businessKey+GlobalConfig.UNDERLINE+type,jsonObject); + + //函数 + Integer exception_type = input.getInteger("exception_type"); + double finalValue; + if (type!=5) { + String field = input.getString("field"); + Object num = Utils.jsonParse(field, data); + try{ + finalValue = Double.parseDouble((String) num); + }catch (Exception e) { + log.info("值不是数值,异常值处理方式:{}",GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type)); + //不是数值 + if (exception_type==1){ + return; + }else { + finalValue = Double.NaN; + } + } +// if ( !(num instanceof Number) ) { +// log.info("值不是数值,异常值处理方式:{}",GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type)); +// //不是数值 +// if (exception_type==1){ +// return; +// }else { +// finalValue = Double.NaN; +// } +// }else { +// // num 是数值,将其转换为 double +// finalValue = ((Number) num).doubleValue(); +// } + DataEntity dataEntity = new DataEntity(); + dataEntity.setValue(finalValue); + dataEntity.setType(type); + receiveData(businessKey+GlobalConfig.UNDERLINE+type,dataEntity); + }else { + JSONArray fields = input.getJSONArray("fields"); + for (Object value : fields) { + JSONObject item = JSONObject.parseObject(JSON.toJSONString(value)); + String field = item.getString("field"); + Object num = Utils.jsonParse(field, data); + Object coefficient = item.get("coefficient"); + if (!(coefficient instanceof Number)){ + log.info("系数 {} 不是数值,跳过",coefficient); + continue; + } + try{ + finalValue = Double.parseDouble((String) num); + }catch (Exception e) { + log.info("值不是数值,异常值处理方式:{}",GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type)); + //不是数值 + if (exception_type==1){ + return; + }else { + finalValue = Double.NaN; + } + } +// if (!(num instanceof Number)) { +// log.info("值不是数值,异常值处理方式:{}", GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type)); +// // 不是数值 +// if (exception_type == 1) { +// continue; // 忽略异常值,继续处理下一个 +// } else { +// finalValue = Double.NaN; // 用 NaN 替代 +// } +// } else { +// // num 是数值,将其转换为 double +// finalValue = ((Number) num).doubleValue(); +// } + + DataEntity dataEntity = new DataEntity(); + dataEntity.setValue(finalValue); + dataEntity.setType(type); + receiveData(businessKey+GlobalConfig.UNDERLINE+type, dataEntity,((Number) coefficient).doubleValue()); + } + } + } + + // 接收数据 + public void receiveData(String businessKey, DataEntity data,Double... coefficients) { + // 获取或创建 DataGroup + DataGroup dataGroup = GlobalConfig.dataGroups.computeIfAbsent(businessKey, k -> new DataGroup()); + // 添加数据到 DataGroup + dataGroup.getDataList().add(data); + // 更新最后更新时间 + dataGroup.setLastUpdateTime(Instant.now()); + + // 添加权重 + if (coefficients != null) { + for (Double coefficient : coefficients) { + if (coefficient != null) { + dataGroup.getWeights().add(coefficient); + } + } + } + } + + +} diff --git a/src/main/java/com/bfd/datacalculate/service/impl/MaxStrategy.java b/src/main/java/com/bfd/datacalculate/service/impl/MaxStrategy.java new file mode 100644 index 0000000..bce84b9 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/impl/MaxStrategy.java @@ -0,0 +1,20 @@ +package com.bfd.datacalculate.service.impl; + +import com.bfd.datacalculate.service.CalculationStrategy; + +import java.util.List; + +/** + * @author guowei + */ +public class MaxStrategy implements CalculationStrategy { + + @Override + public double calculate(List data) { + return data.stream() + .filter(d -> !Double.isNaN(d)) // 过滤掉 NaN 值 + .mapToDouble(Double::doubleValue) + .max() + .orElse(Double.NaN); // 如果过滤后没有数据,返回 NaN + } +} diff --git a/src/main/java/com/bfd/datacalculate/service/impl/MinStrategy.java b/src/main/java/com/bfd/datacalculate/service/impl/MinStrategy.java new file mode 100644 index 0000000..aac86b5 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/impl/MinStrategy.java @@ -0,0 +1,19 @@ +package com.bfd.datacalculate.service.impl; + +import com.bfd.datacalculate.service.CalculationStrategy; + +import java.util.List; + +/** + * @author guowei + */ +public class MinStrategy implements CalculationStrategy { + @Override + public double calculate(List data) { + return data.stream() + .filter(d -> !Double.isNaN(d)) // 过滤掉 NaN 值 + .mapToDouble(Double::doubleValue) + .min() + .orElse(Double.NaN); // 如果过滤后没有数据,返回 NaN + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/datacalculate/service/impl/SumStrategy.java b/src/main/java/com/bfd/datacalculate/service/impl/SumStrategy.java new file mode 100644 index 0000000..bc5aa53 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/impl/SumStrategy.java @@ -0,0 +1,32 @@ +package com.bfd.datacalculate.service.impl; + +import com.bfd.datacalculate.service.CalculationStrategy; + +import java.util.List; + +/** + * @author guowei + */ +public class SumStrategy implements CalculationStrategy { + @Override + public double calculate(List data) { +// return data.stream().mapToDouble(Double::doubleValue).sum(); + // 过滤出所有有效数值(非 NaN 值),并计算它们的总和和计数 + double sum = data.stream() + .filter(d -> !Double.isNaN(d)) // 过滤掉 NaN 值 + .mapToDouble(Double::doubleValue) + .sum(); + + long count = data.stream() + .filter(d -> !Double.isNaN(d)) // 统计有效数值的数量 + .count(); + + // 如果没有有效数值,返回 0 避免除以 0 的情况 + double mean = count > 0 ? sum / count : 0.0; + + // 将 NaN 值替换为均值,然后进行最终求和 + return data.stream() + .mapToDouble(d -> Double.isNaN(d) ? mean : d) // 替换 NaN 为均值 + .sum(); + } +} diff --git a/src/main/java/com/bfd/datacalculate/service/impl/WeightedSumStrategy.java b/src/main/java/com/bfd/datacalculate/service/impl/WeightedSumStrategy.java new file mode 100644 index 0000000..8ee7084 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/service/impl/WeightedSumStrategy.java @@ -0,0 +1,72 @@ +package com.bfd.datacalculate.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.bfd.datacalculate.service.CalculationStrategy; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author guowei + */ +@Slf4j +public class WeightedSumStrategy implements CalculationStrategy { + private final List weights; + + public WeightedSumStrategy(List weights) { + this.weights = weights; + } + + @Override + public double calculate(List data) { + log.info("加权求和-> 数组:{},系数:{}", JSON.toJSONString(data),JSON.toJSONString(weights)); + if (data.size() != weights.size()) { + log.error("Data and weights size must match."); + return 0; + } + double weightedSum = 0.0; + double totalWeight = 0.0; + List validData = new ArrayList<>(); + + // 找到有效的数值并计算均值 + for (Double value : data) { + if (!Double.isNaN(value)) { + validData.add(value); + } + } + + // 计算均值 + double meanValue = validData.stream() + .mapToDouble(Double::doubleValue) + .average() + .orElse(0.0); + + // 用均值替代 NaN + for (int i = 0; i < data.size(); i++) { + Double value = data.get(i); + Double weight = weights.get(i); + + if (Double.isNaN(value)) { + value = meanValue; // 替换 NaN 为均值 + } + if (Double.isNaN(weight)) { + weight = 0.0; // 如果权重是 NaN,设置为 0 或其他值 + } + + weightedSum += value * weight; + totalWeight += weight; + } + + // 如果没有有效的权重,则返回 0 或其他值 + if (totalWeight == 0) { + return 0.0; + } + + //加权和 +// return weightedSum; + // 如果需要归一化加权和,使用总权重进行计算 + log.info("加权和:{},总权重:{}", weightedSum, totalWeight); + return weightedSum / totalWeight; + } +} diff --git a/src/main/java/com/bfd/datacalculate/utils/KfkUtil.java b/src/main/java/com/bfd/datacalculate/utils/KfkUtil.java new file mode 100644 index 0000000..5818b13 --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/utils/KfkUtil.java @@ -0,0 +1,81 @@ +package com.bfd.datacalculate.utils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.Properties; + +/** + * @author guowei + * kfk工具类 + */ +@Component +@Slf4j +public class KfkUtil { + private static String topic; + + private static String brokerList; + + @Value("${crawl.kafka.topic}") + public void setTopic(String topic) { + KfkUtil.topic = topic; + } + + @Value("${crawl.kafka.brokers}") + public void setBrokerList(String brokerList) { + KfkUtil.brokerList = brokerList; + } + private static KafkaProducer kafkaProducer; + + public static int num = 0; + + /** + * 获取KafkaProducer实例 + */ + public static KafkaProducer getProducer() { + if (kafkaProducer == null) { + Properties props = new Properties(); + //xxx服务器ip + props.put("bootstrap.servers", brokerList); + //所有follower都响应了才认为消息提交成功,即"committed" + props.put("acks", "all"); + //retries = MAX 无限重试,直到你意识到出现了问题:) + props.put("retries", 3); + //producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 + props.put("batch.size", 16384); + //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms + //延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 + props.put("linger.ms", 1); + //producer可以用来缓存数据的内存大小。 + props.put("buffer.memory", 33554432); + props.put("key.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + kafkaProducer = new KafkaProducer(props); + } + return kafkaProducer; + } + + /** + * 关闭KafkaProducer实例 + */ + public static void closeProducer() { + if (kafkaProducer != null) { + log.info("----------close producer----------"); + kafkaProducer.close(); + kafkaProducer = null; + } + } + + public static void sendKafka(String resultData) { + KafkaProducer producer = getProducer(); + ProducerRecord se = new ProducerRecord(topic, resultData); + producer.send(se); + log.info("发送kafka成功"); +// num++; + } +} diff --git a/src/main/java/com/bfd/datacalculate/utils/Utils.java b/src/main/java/com/bfd/datacalculate/utils/Utils.java new file mode 100644 index 0000000..414116b --- /dev/null +++ b/src/main/java/com/bfd/datacalculate/utils/Utils.java @@ -0,0 +1,23 @@ +package com.bfd.datacalculate.utils; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.JSONPath; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @author guowei + */ +@Component +public class Utils { + public static Object jsonParse(String key, Map data) { + String[] keySplit = key.split(":"); + String jsonPath = keySplit[1]; + String dataJson = (String) data.get(keySplit[0]); + JSONObject dataJsonObject = JSON.parseObject(dataJson); + Object dataValue = JSONPath.eval(dataJsonObject, jsonPath); + return dataValue; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..41f0feb --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,48 @@ +server: + port: 8384 +crawl: + kafka: + topic: produce_analyze + brokers: 172.18.1.146:9092,172.18.1.147:9092,172.18.1.148:9092 +#日志级别 +logging: + level: + com: + bfd: INFO + #日志路径 + log: + path: ./logs +spring: + redis: + host: 172.18.1.147 + port: 6379 + timeout: 10000 + database: 5 + jedis: + pool: + max-active: 8 # 连接池最大连接数(使用负值表示没有限制) + max-wait: 800 # 连接池最大阻塞等待时间(使用负值表示没有限制) + max-idle: 8 # 连接池中的最大空闲连接 + min-idle: 2 # 连接池中的最小空闲连接 + boot: + admin: + client: + url: http://172.18.1.147:8001 + instance: + service-base-url: http://172.18.1.147:8384 + application: + name: 指标计算 +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: always + health: + elasticsearch: + enabled: false +zookeeper: + connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181 + publish-node: /analyze \ No newline at end of file diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..0c59240 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,38 @@ + + + + + + + + + true + + ${logging.level} + + + ${logging.path}/crawlSchedule.log + + + + ${logging.path}/crawlSchedule.log.%d{yyyy-MM-dd} + + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + UTF-8 + + + + + + + + diff --git a/src/test/java/com/bfd/datacalculate/DataCalculateApplicationTests.java b/src/test/java/com/bfd/datacalculate/DataCalculateApplicationTests.java new file mode 100644 index 0000000..a6909a4 --- /dev/null +++ b/src/test/java/com/bfd/datacalculate/DataCalculateApplicationTests.java @@ -0,0 +1,13 @@ +package com.bfd.datacalculate; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class DataCalculateApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/target/.gitignore b/target/.gitignore new file mode 100644 index 0000000..4569837 --- /dev/null +++ b/target/.gitignore @@ -0,0 +1,2 @@ +/classes/ +/test-classes/