Browse Source

指标计算应用

master
55007 6 months ago
commit
73cc60e5bc
  1. 40
      .classpath
  2. 23
      .project
  3. 5
      .settings/org.eclipse.core.resources.prefs
  4. 8
      .settings/org.eclipse.jdt.core.prefs
  5. 4
      .settings/org.eclipse.m2e.core.prefs
  6. 1
      README.md
  7. 165
      pom.xml
  8. 15
      src/main/java/com/bfd/datacalculate/DataCalculateApplication.java
  9. 49
      src/main/java/com/bfd/datacalculate/config/GlobalConfig.java
  10. 39
      src/main/java/com/bfd/datacalculate/controller/ApiController.java
  11. 18
      src/main/java/com/bfd/datacalculate/model/DataEntity.java
  12. 51
      src/main/java/com/bfd/datacalculate/model/DataGroup.java
  13. 11
      src/main/java/com/bfd/datacalculate/service/CalculationStrategy.java
  14. 14
      src/main/java/com/bfd/datacalculate/service/DataMergeService.java
  15. 84
      src/main/java/com/bfd/datacalculate/service/DataProcessService.java
  16. 22
      src/main/java/com/bfd/datacalculate/service/impl/AverageStrategy.java
  17. 134
      src/main/java/com/bfd/datacalculate/service/impl/DataMergeServiceImpl.java
  18. 20
      src/main/java/com/bfd/datacalculate/service/impl/MaxStrategy.java
  19. 19
      src/main/java/com/bfd/datacalculate/service/impl/MinStrategy.java
  20. 32
      src/main/java/com/bfd/datacalculate/service/impl/SumStrategy.java
  21. 72
      src/main/java/com/bfd/datacalculate/service/impl/WeightedSumStrategy.java
  22. 81
      src/main/java/com/bfd/datacalculate/utils/KfkUtil.java
  23. 23
      src/main/java/com/bfd/datacalculate/utils/Utils.java
  24. 48
      src/main/resources/application.yml
  25. 38
      src/main/resources/logback-spring.xml
  26. 13
      src/test/java/com/bfd/datacalculate/DataCalculateApplicationTests.java
  27. 2
      target/.gitignore

40
.classpath

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

23
.project

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>datacalculate</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

5
.settings/org.eclipse.core.resources.prefs

@ -0,0 +1,5 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8

8
.settings/org.eclipse.jdt.core.prefs

@ -0,0 +1,8 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8

4
.settings/org.eclipse.m2e.core.prefs

@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

1
README.md

@ -0,0 +1 @@
指标计算

165
pom.xml

@ -0,0 +1,165 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bfd</groupId>
<artifactId>datacalculate</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>datacalculate</name>
<description>datacalculate</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.2.4.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version> <!-- 版本号根据需要选择 -->
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.12</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.27</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>org.thymeleaf</groupId>
<artifactId>thymeleaf</artifactId>
<version>3.0.12.RELEASE</version> <!-- 版本号根据你需要使用的版本来确定 -->
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-client</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<!--不打入jar包的文件类型或者路径-->
<excludes>
<exclude>*.properties</exclude>
<exclude>*.yml</exclude>
<exclude>*.yaml</exclude>
</excludes>
<archive>
<manifest>
<!-- 执行的主程序路径 -->
<mainClass>com.bfd.datacalculate.DataCalculateApplication</mainClass>
<!--是否要把第三方jar放到manifest的classpath中-->
<addClasspath>true</addClasspath>
<!--生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/-->
<classpathPrefix>lib/</classpathPrefix>
<!-- 打包时 MANIFEST.MF 文件不记录的时间戳版本 -->
<useUniqueVersions>false</useUniqueVersions>
</manifest>
<manifestEntries>
<!-- 在 Class-Path 下添加配置文件的路径 -->
<Class-Path>config/</Class-Path>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<resources>
<!--把配置文件打包到指定路径-->
<resource>
<directory>src/main/resources/</directory>
<includes>
<include>*.properties</include>
<include>*.yml</include>
<exclude>*.yaml</exclude>
</includes>
</resource>
</resources>
<outputDirectory>${project.build.directory}/config</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

15
src/main/java/com/bfd/datacalculate/DataCalculateApplication.java

@ -0,0 +1,15 @@
package com.bfd.datacalculate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class DataCalculateApplication {
public static void main(String[] args) {
SpringApplication.run(DataCalculateApplication.class, args);
}
}

49
src/main/java/com/bfd/datacalculate/config/GlobalConfig.java

@ -0,0 +1,49 @@
package com.bfd.datacalculate.config;
import com.alibaba.fastjson2.JSONObject;
import com.bfd.datacalculate.model.DataEntity;
import com.bfd.datacalculate.model.DataGroup;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author guowei
*/
@Component
public class GlobalConfig {
/** 全局锁 **/
public static final Object lock = new Object();
public static final long INACTIVITY_THRESHOLD = 60_000; // 1分钟的毫秒数
public static final Map<String, DataGroup> dataGroups = new ConcurrentHashMap<>();
public static final Map<String, JSONObject> originalParameters = new ConcurrentHashMap<>();
// function 映射
public static final Map<Integer, String> FUNCTION_MAP;
static {
Map<Integer, String> functionMap = new HashMap<>();
functionMap.put(1, "求和");
functionMap.put(2, "平均值");
functionMap.put(3, "最大值");
functionMap.put(4, "最小值");
functionMap.put(5, "加权求和");
FUNCTION_MAP = Collections.unmodifiableMap(functionMap); // 不可变的映射
}
// exception_handle 映射
public static final Map<Integer, String> EXCEPTION_HANDLE_MAP;
static {
Map<Integer, String> exceptionHandleMap = new HashMap<>();
exceptionHandleMap.put(1, "忽略异常值");
exceptionHandleMap.put(2, "用均值代替");
EXCEPTION_HANDLE_MAP = Collections.unmodifiableMap(exceptionHandleMap); // 不可变的映射
}
public static final String UNDERLINE = "_";
}

39
src/main/java/com/bfd/datacalculate/controller/ApiController.java

@ -0,0 +1,39 @@
package com.bfd.datacalculate.controller;
import com.alibaba.fastjson2.JSONObject;
import com.bfd.datacalculate.config.GlobalConfig;
import com.bfd.datacalculate.service.DataMergeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* @author guowei
*/
@RestController
@Slf4j
@RequestMapping(value = "/data")
@CrossOrigin(origins = "*", maxAge = 3600)
public class ApiController {
@Resource
DataMergeService dataMergeService;
/**
* 文档转换 Api
* @param jsonObject
* @return
*/
@RequestMapping(value = "/calculate", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
public String varAna(@RequestBody JSONObject jsonObject) {
log.info("数据合并计算参数:"+jsonObject);
// JSONObject data = jsonObject.getJSONObject("data");
// String businessKey = data.getString("businessKey");
synchronized(GlobalConfig.lock){
dataMergeService.merge(jsonObject);
}
return "success";
}
}

18
src/main/java/com/bfd/datacalculate/model/DataEntity.java

@ -0,0 +1,18 @@
package com.bfd.datacalculate.model;
import com.bfd.datacalculate.service.CalculationStrategy;
import com.bfd.datacalculate.service.impl.*;
import lombok.Data;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* @author guowei
*/
@Data
public class DataEntity {
private double value;
private Integer type; // 计算类型
}

51
src/main/java/com/bfd/datacalculate/model/DataGroup.java

@ -0,0 +1,51 @@
package com.bfd.datacalculate.model;
import com.bfd.datacalculate.service.CalculationStrategy;
import com.bfd.datacalculate.service.impl.*;
import lombok.Data;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* @author guowei
*/
@Data
public class DataGroup {
private List<DataEntity> dataList = new ArrayList<>();
private Instant lastUpdateTime = Instant.now();
private List<Double> weights = new ArrayList<>(); // 仅用于加权求和
// 根据 type 选择计算策略
public CalculationStrategy getStrategy() {
if (dataList.isEmpty()) {
throw new IllegalArgumentException("Data list is empty");
}
Integer type = dataList.get(0).getType();
CalculationStrategy strategy;
switch (type) {
case 1:
strategy = new SumStrategy();
break;
case 2:
strategy = new AverageStrategy();
break;
case 3:
strategy = new MaxStrategy();
break;
case 4:
strategy = new MinStrategy();
break;
case 5:
strategy = new WeightedSumStrategy(weights);
break;
default:
throw new IllegalArgumentException("Unknown calculation type");
}
return strategy;
}
}

11
src/main/java/com/bfd/datacalculate/service/CalculationStrategy.java

@ -0,0 +1,11 @@
package com.bfd.datacalculate.service;
import java.util.List;
/**
* @author guowei
*/
public interface CalculationStrategy {
double calculate(List<Double> data);
}

14
src/main/java/com/bfd/datacalculate/service/DataMergeService.java

@ -0,0 +1,14 @@
package com.bfd.datacalculate.service;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.stereotype.Service;
/**
* @author guowei
* 数据合并处理
*/
@Service
public interface DataMergeService {
void merge(JSONObject jsonObject);
}

84
src/main/java/com/bfd/datacalculate/service/DataProcessService.java

@ -0,0 +1,84 @@
package com.bfd.datacalculate.service;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.bfd.datacalculate.config.GlobalConfig;
import com.bfd.datacalculate.model.DataEntity;
import com.bfd.datacalculate.model.DataGroup;
import com.bfd.datacalculate.utils.KfkUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.*;
/**
* @author guowei
*/
@Service
@Slf4j
public class DataProcessService {
@Scheduled(fixedRate = 60_000) // 每分钟执行一次
public void scheduledProcessing() {
log.info("开始执行计算,有 {} 个数据组等待计算",GlobalConfig.dataGroups.isEmpty()? 0 : GlobalConfig.dataGroups.size());
if (GlobalConfig.dataGroups.isEmpty()) {
// 如果为空则无需进行任何处理
System.out.println("No data to process.");
return;
}
Instant now = Instant.now();
Iterator<Map.Entry<String, DataGroup>> iterator = GlobalConfig.dataGroups.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, DataGroup> entry = iterator.next();
DataGroup group = entry.getValue();
synchronized (group) {
if (now.minusMillis(GlobalConfig.INACTIVITY_THRESHOLD).isAfter(group.getLastUpdateTime())) {
log.info("数据组ID->{}:到达计算时间限制{},开始计算",entry.getKey(),GlobalConfig.INACTIVITY_THRESHOLD);
processData(entry.getKey(), group);
iterator.remove(); // 使用迭代器安全地移除元素
}
}
}
}
private void processData(String key, DataGroup group) {
Map result = new HashMap<>(16);
result.put("status",1);
result.put("message","成功");
Map results = new HashMap<>(16);
results.put("isLast",1);
results.put("id", IdUtil.randomUUID());
Double calculate = 0.0;
try {
// 处理数据的逻辑
CalculationStrategy strategy = group.getStrategy();
List<Double> doubles = new ArrayList<>();
List<DataEntity> dataList = group.getDataList();
for (DataEntity data : dataList) {
doubles.add(data.getValue());
}
calculate = strategy.calculate(doubles);
log.info("计算{},数组为{},结果={}", GlobalConfig.FUNCTION_MAP.get(dataList.get(0).getType()), JSON.toJSONString(doubles), calculate);
System.out.println(calculate);
}catch (Exception e){
e.printStackTrace();
log.error("计算数据异常",e);
result.put("status",2);
result.put("message","失败");
}
results.put("content",calculate.toString());
result.put("results",JSON.toJSONString(results));
JSONObject jsonObject = GlobalConfig.originalParameters.get(key);
jsonObject.put("result",result);
System.out.println(JSON.toJSONString(jsonObject));
KfkUtil.sendKafka(JSON.toJSONString(jsonObject));
log.info("发送kfk成功,result:{}",JSON.toJSONString(result));
GlobalConfig.originalParameters.remove(key);
}
}

22
src/main/java/com/bfd/datacalculate/service/impl/AverageStrategy.java

@ -0,0 +1,22 @@
package com.bfd.datacalculate.service.impl;
import com.bfd.datacalculate.service.CalculationStrategy;
import java.util.List;
/**
* @author guowei
*/
public class AverageStrategy implements CalculationStrategy {
@Override
public double calculate(List<Double> data) {
return data.stream()
.filter(d -> !Double.isNaN(d)) // 过滤掉 NaN
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0); // 如果过滤后没有数据返回 0.0
}
}

134
src/main/java/com/bfd/datacalculate/service/impl/DataMergeServiceImpl.java

@ -0,0 +1,134 @@
package com.bfd.datacalculate.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.bfd.datacalculate.config.GlobalConfig;
import com.bfd.datacalculate.model.DataEntity;
import com.bfd.datacalculate.model.DataGroup;
import com.bfd.datacalculate.service.DataMergeService;
import com.bfd.datacalculate.utils.Utils;
import jdk.nashorn.internal.objects.Global;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
/**
* @author guowei
*/
@Service
@Slf4j
public class DataMergeServiceImpl implements DataMergeService {
/**
* 合并多条数据
* @param jsonObject
*/
@Override
public void merge(JSONObject jsonObject) {
JSONObject data = jsonObject.getJSONObject("data");
String businessKey = data.getString("businessKey");
//输入
JSONObject input = jsonObject.getJSONObject("input");
log.info("合并计算 --> " + businessKey + ";data -->" + jsonObject);
Integer type = input.getInteger("type");
GlobalConfig.originalParameters.putIfAbsent(businessKey+GlobalConfig.UNDERLINE+type,jsonObject);
//函数
Integer exception_type = input.getInteger("exception_type");
double finalValue;
if (type!=5) {
String field = input.getString("field");
Object num = Utils.jsonParse(field, data);
try{
finalValue = Double.parseDouble((String) num);
}catch (Exception e) {
log.info("值不是数值,异常值处理方式:{}",GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type));
//不是数值
if (exception_type==1){
return;
}else {
finalValue = Double.NaN;
}
}
// if ( !(num instanceof Number) ) {
// log.info("值不是数值,异常值处理方式:{}",GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type));
// //不是数值
// if (exception_type==1){
// return;
// }else {
// finalValue = Double.NaN;
// }
// }else {
// // num 是数值将其转换为 double
// finalValue = ((Number) num).doubleValue();
// }
DataEntity dataEntity = new DataEntity();
dataEntity.setValue(finalValue);
dataEntity.setType(type);
receiveData(businessKey+GlobalConfig.UNDERLINE+type,dataEntity);
}else {
JSONArray fields = input.getJSONArray("fields");
for (Object value : fields) {
JSONObject item = JSONObject.parseObject(JSON.toJSONString(value));
String field = item.getString("field");
Object num = Utils.jsonParse(field, data);
Object coefficient = item.get("coefficient");
if (!(coefficient instanceof Number)){
log.info("系数 {} 不是数值,跳过",coefficient);
continue;
}
try{
finalValue = Double.parseDouble((String) num);
}catch (Exception e) {
log.info("值不是数值,异常值处理方式:{}",GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type));
//不是数值
if (exception_type==1){
return;
}else {
finalValue = Double.NaN;
}
}
// if (!(num instanceof Number)) {
// log.info("值不是数值,异常值处理方式:{}", GlobalConfig.EXCEPTION_HANDLE_MAP.get(exception_type));
// // 不是数值
// if (exception_type == 1) {
// continue; // 忽略异常值继续处理下一个
// } else {
// finalValue = Double.NaN; // NaN 替代
// }
// } else {
// // num 是数值将其转换为 double
// finalValue = ((Number) num).doubleValue();
// }
DataEntity dataEntity = new DataEntity();
dataEntity.setValue(finalValue);
dataEntity.setType(type);
receiveData(businessKey+GlobalConfig.UNDERLINE+type, dataEntity,((Number) coefficient).doubleValue());
}
}
}
// 接收数据
public void receiveData(String businessKey, DataEntity data,Double... coefficients) {
// 获取或创建 DataGroup
DataGroup dataGroup = GlobalConfig.dataGroups.computeIfAbsent(businessKey, k -> new DataGroup());
// 添加数据到 DataGroup
dataGroup.getDataList().add(data);
// 更新最后更新时间
dataGroup.setLastUpdateTime(Instant.now());
// 添加权重
if (coefficients != null) {
for (Double coefficient : coefficients) {
if (coefficient != null) {
dataGroup.getWeights().add(coefficient);
}
}
}
}
}

20
src/main/java/com/bfd/datacalculate/service/impl/MaxStrategy.java

@ -0,0 +1,20 @@
package com.bfd.datacalculate.service.impl;
import com.bfd.datacalculate.service.CalculationStrategy;
import java.util.List;
/**
* @author guowei
*/
public class MaxStrategy implements CalculationStrategy {
@Override
public double calculate(List<Double> data) {
return data.stream()
.filter(d -> !Double.isNaN(d)) // 过滤掉 NaN
.mapToDouble(Double::doubleValue)
.max()
.orElse(Double.NaN); // 如果过滤后没有数据返回 NaN
}
}

19
src/main/java/com/bfd/datacalculate/service/impl/MinStrategy.java

@ -0,0 +1,19 @@
package com.bfd.datacalculate.service.impl;
import com.bfd.datacalculate.service.CalculationStrategy;
import java.util.List;
/**
* @author guowei
*/
public class MinStrategy implements CalculationStrategy {
@Override
public double calculate(List<Double> data) {
return data.stream()
.filter(d -> !Double.isNaN(d)) // 过滤掉 NaN
.mapToDouble(Double::doubleValue)
.min()
.orElse(Double.NaN); // 如果过滤后没有数据返回 NaN
}
}

32
src/main/java/com/bfd/datacalculate/service/impl/SumStrategy.java

@ -0,0 +1,32 @@
package com.bfd.datacalculate.service.impl;
import com.bfd.datacalculate.service.CalculationStrategy;
import java.util.List;
/**
* @author guowei
*/
public class SumStrategy implements CalculationStrategy {
@Override
public double calculate(List<Double> data) {
// return data.stream().mapToDouble(Double::doubleValue).sum();
// 过滤出所有有效数值 NaN 并计算它们的总和和计数
double sum = data.stream()
.filter(d -> !Double.isNaN(d)) // 过滤掉 NaN
.mapToDouble(Double::doubleValue)
.sum();
long count = data.stream()
.filter(d -> !Double.isNaN(d)) // 统计有效数值的数量
.count();
// 如果没有有效数值返回 0 避免除以 0 的情况
double mean = count > 0 ? sum / count : 0.0;
// NaN 值替换为均值然后进行最终求和
return data.stream()
.mapToDouble(d -> Double.isNaN(d) ? mean : d) // 替换 NaN 为均值
.sum();
}
}

72
src/main/java/com/bfd/datacalculate/service/impl/WeightedSumStrategy.java

@ -0,0 +1,72 @@
package com.bfd.datacalculate.service.impl;
import com.alibaba.fastjson2.JSON;
import com.bfd.datacalculate.service.CalculationStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* @author guowei
*/
@Slf4j
public class WeightedSumStrategy implements CalculationStrategy {
private final List<Double> weights;
public WeightedSumStrategy(List<Double> weights) {
this.weights = weights;
}
@Override
public double calculate(List<Double> data) {
log.info("加权求和-> 数组:{},系数:{}", JSON.toJSONString(data),JSON.toJSONString(weights));
if (data.size() != weights.size()) {
log.error("Data and weights size must match.");
return 0;
}
double weightedSum = 0.0;
double totalWeight = 0.0;
List<Double> validData = new ArrayList<>();
// 找到有效的数值并计算均值
for (Double value : data) {
if (!Double.isNaN(value)) {
validData.add(value);
}
}
// 计算均值
double meanValue = validData.stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0);
// 用均值替代 NaN
for (int i = 0; i < data.size(); i++) {
Double value = data.get(i);
Double weight = weights.get(i);
if (Double.isNaN(value)) {
value = meanValue; // 替换 NaN 为均值
}
if (Double.isNaN(weight)) {
weight = 0.0; // 如果权重是 NaN设置为 0 或其他值
}
weightedSum += value * weight;
totalWeight += weight;
}
// 如果没有有效的权重则返回 0 或其他值
if (totalWeight == 0) {
return 0.0;
}
//加权和
// return weightedSum;
// 如果需要归一化加权和使用总权重进行计算
log.info("加权和:{},总权重:{}", weightedSum, totalWeight);
return weightedSum / totalWeight;
}
}

81
src/main/java/com/bfd/datacalculate/utils/KfkUtil.java

@ -0,0 +1,81 @@
package com.bfd.datacalculate.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
/**
* @author guowei
* kfk工具类
*/
@Component
@Slf4j
public class KfkUtil {
private static String topic;
private static String brokerList;
@Value("${crawl.kafka.topic}")
public void setTopic(String topic) {
KfkUtil.topic = topic;
}
@Value("${crawl.kafka.brokers}")
public void setBrokerList(String brokerList) {
KfkUtil.brokerList = brokerList;
}
private static KafkaProducer<String, String> kafkaProducer;
public static int num = 0;
/**
* 获取KafkaProducer实例
*/
public static KafkaProducer<String, String> getProducer() {
if (kafkaProducer == null) {
Properties props = new Properties();
//xxx服务器ip
props.put("bootstrap.servers", brokerList);
//所有follower都响应了才认为消息提交成功"committed"
props.put("acks", "all");
//retries = MAX 无限重试直到你意识到出现了问题:)
props.put("retries", 3);
//producer将试图批处理消息记录以减少请求次数.默认的批量处理消息字节数
props.put("batch.size", 16384);
//batch.size当批量的数据大小达到设定值后就会立即发送不顾下面的linger.ms
//延迟1ms发送这项设置将通过增加小的延迟来完成--不是立即发送一条记录producer将会等待给定的延迟时间以允许其他消息记录发送这些消息记录可以批量处理
props.put("linger.ms", 1);
//producer可以用来缓存数据的内存大小
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
/**
* 关闭KafkaProducer实例
*/
public static void closeProducer() {
if (kafkaProducer != null) {
log.info("----------close producer----------");
kafkaProducer.close();
kafkaProducer = null;
}
}
public static void sendKafka(String resultData) {
KafkaProducer<String, String> producer = getProducer();
ProducerRecord<String, String> se = new ProducerRecord<String, String>(topic, resultData);
producer.send(se);
log.info("发送kafka成功");
// num++;
}
}

23
src/main/java/com/bfd/datacalculate/utils/Utils.java

@ -0,0 +1,23 @@
package com.bfd.datacalculate.utils;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author guowei
*/
@Component
public class Utils {
public static Object jsonParse(String key, Map data) {
String[] keySplit = key.split(":");
String jsonPath = keySplit[1];
String dataJson = (String) data.get(keySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
Object dataValue = JSONPath.eval(dataJsonObject, jsonPath);
return dataValue;
}
}

48
src/main/resources/application.yml

@ -0,0 +1,48 @@
server:
port: 8384
crawl:
kafka:
topic: produce_analyze
brokers: 172.18.1.146:9092,172.18.1.147:9092,172.18.1.148:9092
#日志级别
logging:
level:
com:
bfd: INFO
#日志路径
log:
path: ./logs
spring:
redis:
host: 172.18.1.147
port: 6379
timeout: 10000
database: 5
jedis:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
max-wait: 800 # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 8 # 连接池中的最大空闲连接
min-idle: 2 # 连接池中的最小空闲连接
boot:
admin:
client:
url: http://172.18.1.147:8001
instance:
service-base-url: http://172.18.1.147:8384
application:
name: 指标计算
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
health:
elasticsearch:
enabled: false
zookeeper:
connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181
publish-node: /analyze

38
src/main/resources/logback-spring.xml

@ -0,0 +1,38 @@
<configuration>
<!-- 属性文件:在properties文件中找到对应的配置项 -->
<springProperty scope="context" name="logging.path" source="logging.log.path"/>
<springProperty scope="context" name="logging.level" source="logging.level.com.bfd"/>
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 -->
<!-- <appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d{HH:mm:ss.SSS} %-5level %logger{80} - %msg%n</Pattern>
</encoder>
</appender> -->
<appender name="GLMAPPER-LOGGERONE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${logging.level}</level>
</filter>
<file>
${logging.path}/crawlSchedule.log
<!-- ${logging.path}/sendKafka.log -->
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logging.path}/crawlSchedule.log.%d{yyyy-MM-dd}</FileNamePattern>
<!-- <FileNamePattern>${logging.path}/sendKafka.log.%d{yyyy-MM-dd}</FileNamePattern> -->
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="GLMAPPER-LOGGERONE"/>
<!-- <appender-ref ref="STDOUT"/> -->
</root>
</configuration>

13
src/test/java/com/bfd/datacalculate/DataCalculateApplicationTests.java

@ -0,0 +1,13 @@
package com.bfd.datacalculate;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DataCalculateApplicationTests {
@Test
void contextLoads() {
}
}

2
target/.gitignore

@ -0,0 +1,2 @@
/classes/
/test-classes/
Loading…
Cancel
Save