From 6621b5f0bfea97b55b10543cb410cf4753d52750 Mon Sep 17 00:00:00 2001 From: 55007 <55007@maojian> Date: Tue, 7 Jan 2025 17:20:24 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=87=E6=BB=A4=E5=99=A8=E5=BA=94=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 34 +++ README.md | 1 + pom.xml | 130 ++++++++ .../crawl/datafilter/DataFilterApplication.java | 24 ++ .../com/bfd/crawl/datafilter/bean/Constants.java | 36 +++ .../com/bfd/crawl/datafilter/bean/ResponsePo.java | 59 ++++ .../config/AsyncThreadConfiguration.java | 48 +++ .../controller/DataFilterController.java | 39 +++ .../bfd/crawl/datafilter/dao/FilterTypeDao.java | 14 + .../bfd/crawl/datafilter/entity/FilterType.java | 49 +++ .../bfd/crawl/datafilter/enums/ResponseCode.java | 32 ++ .../exception/GlobalExceptionHandler.java | 32 ++ .../crawl/datafilter/service/HandlerService.java | 331 +++++++++++++++++++++ .../bfd/crawl/datafilter/service/SendService.java | 47 +++ .../bfd/crawl/datafilter/service/StartServcie.java | 79 +++++ .../com/bfd/crawl/datafilter/util/DataUtil.java | 61 ++++ .../com/bfd/crawl/datafilter/util/DateUtil.java | 72 +++++ .../com/bfd/crawl/datafilter/util/QueueUtil.java | 16 + .../com/bfd/crawl/datafilter/util/StringUtil.java | 94 ++++++ src/main/resources/application.yml | 64 ++++ src/main/resources/logback-spring.xml | 36 +++ .../datafilter/DataFilterApplicationTests.java | 13 + 22 files changed, 1311 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/com/bfd/crawl/datafilter/DataFilterApplication.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/bean/Constants.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/bean/ResponsePo.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/config/AsyncThreadConfiguration.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/controller/DataFilterController.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/dao/FilterTypeDao.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/entity/FilterType.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/enums/ResponseCode.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/exception/GlobalExceptionHandler.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/service/HandlerService.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/service/SendService.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/service/StartServcie.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/util/DataUtil.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/util/DateUtil.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/util/QueueUtil.java create mode 100644 src/main/java/com/bfd/crawl/datafilter/util/StringUtil.java create mode 100644 src/main/resources/application.yml create mode 100644 src/main/resources/logback-spring.xml create mode 100644 src/test/java/com/bfd/crawl/datafilter/DataFilterApplicationTests.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cf60db2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,34 @@ +HELP.md +target/ +logs/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..943dfea --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +过滤器 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..0786b12 --- /dev/null +++ b/pom.xml @@ -0,0 +1,130 @@ + + + 4.0.0 + com.bfd.crawl + dataFilter + 0.0.1-SNAPSHOT + dataFilter + dataFilter + + 1.8 + UTF-8 + UTF-8 + 2.2.4.RELEASE + + + + + de.codecentric + spring-boot-admin-client + 2.2.4 + + + org.springframework.boot + spring-boot-starter + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-web + + + + org.redisson + redisson-spring-boot-starter + 3.13.6 + + + org.springframework.boot + spring-boot-starter-data-redis + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + mysql + mysql-connector-java + runtime + + + + com.alibaba + fastjson + 2.0.17 + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + com.bfd.util + pauseTool + 1.0 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + dataFilter-0.0.1-SNAPSHOT + + + org.springframework.boot + spring-boot-maven-plugin + 2.4.1 + + true + + + + + repackage + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + diff --git a/src/main/java/com/bfd/crawl/datafilter/DataFilterApplication.java b/src/main/java/com/bfd/crawl/datafilter/DataFilterApplication.java new file mode 100644 index 0000000..340e21a --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/DataFilterApplication.java @@ -0,0 +1,24 @@ +package com.bfd.crawl.datafilter; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication +@EnableAsync +@EnableScheduling +/** + * @author:jinming + * @className:DataFilterApplication + * @version:1.0 + * @description: + * @Date:2023/7/31 17:53 + */ +public class DataFilterApplication { + + public static void main(String[] args) { + SpringApplication.run(DataFilterApplication.class, args); + } + +} diff --git a/src/main/java/com/bfd/crawl/datafilter/bean/Constants.java b/src/main/java/com/bfd/crawl/datafilter/bean/Constants.java new file mode 100644 index 0000000..17424bc --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/bean/Constants.java @@ -0,0 +1,36 @@ +package com.bfd.crawl.datafilter.bean; + +/** + * @author:jinming + * @className:Constants + * @version:1.0 + * @description: + * @Date:2023/7/14 10:41 + */ +public class Constants { + /** + * 查询数据时间 + */ + public static String TIME = "time"; + + /** + * 查询数据开始时间 + */ + public static String START_TIME = "startTime"; + + /** + * 查询数据结束时间 + */ + public static String END_TIME = "endTime"; + + /** + * 查询排除词 + */ + public static String EXCLUSION_WORDS = "exclusionWords"; + + /** + * 查询包含命中关键词 + */ + public static String INCLUD_WORDS = "includWords"; + +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/bean/ResponsePo.java b/src/main/java/com/bfd/crawl/datafilter/bean/ResponsePo.java new file mode 100644 index 0000000..7677269 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/bean/ResponsePo.java @@ -0,0 +1,59 @@ +package com.bfd.crawl.datafilter.bean; + + +import com.bfd.crawl.datafilter.enums.ResponseCode; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author:jinming + * @className:ResponsePo + * @version:1.0 + * @description: + * @Date:2023/4/3 17:23 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ResponsePo { + /** + * 响应码 + */ + private int code; + + /** + * 正常放 返回数据 的JSON串 + */ + private Object data; + + /** + * 提示消息 + */ + private String message; + + public static ResponsePo success() { + return setStatus(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getMessage()); + } + + public static ResponsePo error() { + return setStatus(ResponseCode.FAILURE.getCode(), ResponseCode.FAILURE.getMessage()); + } + + public static ResponsePo setStatus(int code, String message) { + ResponsePo resultBean = new ResponsePo(); + resultBean.code = code; + resultBean.message = message; + return resultBean; + } + public ResponsePo(int code, String message) { + this.code = code; + this.message = message; + this.data = data; + } + public ResponsePo(ResponseCode responseCode){ + this.code = responseCode.getCode(); + this.message = responseCode.getMessage(); + this.data = data; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/config/AsyncThreadConfiguration.java b/src/main/java/com/bfd/crawl/datafilter/config/AsyncThreadConfiguration.java new file mode 100644 index 0000000..08a5aef --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/config/AsyncThreadConfiguration.java @@ -0,0 +1,48 @@ +package com.bfd.crawl.datafilter.config; + + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +/** + * @author jinming + * @version 1.0 + * @className AsyncThreadConfiguration + * @Date 2022/2/17 18:37 + */ +@Configuration +@EnableAsync +public class AsyncThreadConfiguration { + @Bean(name = "asyncExecutor") + public Executor asyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(500); + // 并发线程的数量限制为2 + executor.setMaxPoolSize(500); + // 线程队列 + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("dataFilter-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(true); + return executor; + } + @Bean(name = "sendExecutor") + public Executor sendExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(500); + // 并发线程的数量限制为2 + executor.setMaxPoolSize(500); + // 线程队列 + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("sendData-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(true); + return executor; + } +} diff --git a/src/main/java/com/bfd/crawl/datafilter/controller/DataFilterController.java b/src/main/java/com/bfd/crawl/datafilter/controller/DataFilterController.java new file mode 100644 index 0000000..67ed787 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/controller/DataFilterController.java @@ -0,0 +1,39 @@ +package com.bfd.crawl.datafilter.controller; + +import com.alibaba.fastjson.JSON; +import com.bfd.crawl.datafilter.bean.ResponsePo; +import com.bfd.crawl.datafilter.enums.ResponseCode; +import com.bfd.crawl.datafilter.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +/** + * @author:jinming + * @className:DataFilterController + * @version:1.0 + * @description: + * @Date:2023/7/26 11:21 + */ +@RestController +@RequestMapping("/handlerdata") +@Slf4j +public class DataFilterController { + @PostMapping("/filter") + public ResponsePo documentFeedback(@RequestBody String dataJson) { + + ResponsePo responsePo = ResponsePo.success(); + log.info("新增任务:" + dataJson); + try { + QueueUtil.taskQueue.put(dataJson); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return responsePo; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/dao/FilterTypeDao.java b/src/main/java/com/bfd/crawl/datafilter/dao/FilterTypeDao.java new file mode 100644 index 0000000..2c344ff --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/dao/FilterTypeDao.java @@ -0,0 +1,14 @@ +package com.bfd.crawl.datafilter.dao; + +import com.bfd.crawl.datafilter.entity.FilterType; +import org.springframework.data.jpa.repository.JpaRepository; + +/** + * @author jinming + * @version 1.0 + * @className FilterTypeDao + * @Date 2023/7/26 16:28 + */ +public interface FilterTypeDao extends JpaRepository { + FilterType getFilterTypeById(int id); +} diff --git a/src/main/java/com/bfd/crawl/datafilter/entity/FilterType.java b/src/main/java/com/bfd/crawl/datafilter/entity/FilterType.java new file mode 100644 index 0000000..16ef305 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/entity/FilterType.java @@ -0,0 +1,49 @@ +package com.bfd.crawl.datafilter.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.persistence.*; + +/** + * @author:jinming + * @className:FilterType + * @version:1.0 + * @description: + * @Date:2023/7/26 10:58 + */ +@Entity +@Data +@NoArgsConstructor +@AllArgsConstructor +@Table(name = "filter_type") +public class FilterType { + /** + * 自增主键ID + */ + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "id") + private Integer id; + /** + * 类型(66运算方式,67运算条件) + */ + @Column(name = "type") + private String type; + /** + * 描述 + */ + @Column(name = "describe") + private String describe; + /** + * 上级id + */ + @Column(name = "parent_id") + private Integer parentId; + /** + * 等级 + */ + @Column(name = "level") + private String level; +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/enums/ResponseCode.java b/src/main/java/com/bfd/crawl/datafilter/enums/ResponseCode.java new file mode 100644 index 0000000..a80635f --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/enums/ResponseCode.java @@ -0,0 +1,32 @@ +package com.bfd.crawl.datafilter.enums; + +/** + * @author:jinming + * @className:ResponseCodeEnum + * @version:1.0 + * @description:响应结果码枚举类 + * @Date:2023/2/28 11:40 + */ +public enum ResponseCode { + //返回结果码枚举类 + SUCCESS(200, "操作成功"), + FAILURE(400, "参数错误"), + INTERNAL_SERVER_ERROR(500, "服务器内部错误"), + TYPE_NOT_SUPPORT(601,"文件类型不支持"); + + private int code; + private String message; + + ResponseCode(int code, String message) { + this.code = code; + this.message = message; + } + + public int getCode() { + return code; + } + + public String getMessage() { + return message; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/exception/GlobalExceptionHandler.java b/src/main/java/com/bfd/crawl/datafilter/exception/GlobalExceptionHandler.java new file mode 100644 index 0000000..c7d0921 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/exception/GlobalExceptionHandler.java @@ -0,0 +1,32 @@ +package com.bfd.crawl.datafilter.exception; + + + +import com.bfd.crawl.datafilter.bean.ResponsePo; +import com.bfd.crawl.datafilter.enums.ResponseCode; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +/** + * @author:jinming + * @className:GlobalExceptionHandler + * @version:1.0 + * @description: 异常处理类 + * @Date:2023/2/28 16:29 + */ +@RestControllerAdvice +public class GlobalExceptionHandler { + + @ExceptionHandler(value = {IllegalArgumentException.class}) + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ResponsePo handleBadRequest(Exception ex) { + return new ResponsePo(ResponseCode.FAILURE.getCode(), ex.getMessage()); + } + @ExceptionHandler(value = {Exception.class}) + @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) + public ResponsePo handleException(Exception ex) { + return new ResponsePo(ResponseCode.INTERNAL_SERVER_ERROR.getCode(), ex.getMessage()); + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/service/HandlerService.java b/src/main/java/com/bfd/crawl/datafilter/service/HandlerService.java new file mode 100644 index 0000000..2b22030 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/service/HandlerService.java @@ -0,0 +1,331 @@ +package com.bfd.crawl.datafilter.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSONPath; +import com.bfd.crawl.datafilter.dao.FilterTypeDao; +import com.bfd.crawl.datafilter.entity.FilterType; +import com.bfd.crawl.datafilter.util.DataUtil; +import com.bfd.crawl.datafilter.util.DateUtil; +import com.bfd.crawl.datafilter.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author:jinming + * @className:HandlerService + * @version:1.0 + * @description: + * @Date:2023/7/26 11:29 + */ +@Service +@Slf4j +public class HandlerService { + @Autowired + private FilterTypeDao filterTypeDao; + + + @Async("asyncExecutor") + public void run() { + while (true) { + if (QueueUtil.taskQueue.size() > 0) { + String dataJson = null; + try { + dataJson = QueueUtil.taskQueue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + Map parse = null; + try { + parse = (Map) JSON.parse(dataJson); + } catch (Exception e) { + e.printStackTrace(); + } + try { + Map data = (Map) parse.get("data"); + int id = (int) parse.get("id"); + log.info("任务:" + id + "已开始处理"); + boolean needSend = false; + Map admin = (Map) parse.get("input"); + Map dataMap = (Map) parse.get("data"); + log.info("任务:" + id + "的判断条件为:" + JSON.toJSONString(admin)); + boolean caseSensitive = true; + try { + caseSensitive = (boolean) admin.get("caseSensitive"); + } catch (Exception e) { + + } + List> conditions = (List>) admin.get("or"); + for (List condition : conditions) { + for (Map andCondition : condition) { + int conditionId = (int) andCondition.get("id"); + FilterType filterType = filterTypeDao.getFilterTypeById(conditionId); + int typeId = filterType.getParentId(); + switch (typeId) { + case 6000: + needSend = textTypeHandler(conditionId, andCondition, dataMap, caseSensitive); + break; + case 6001: + needSend = numberTypeHandler(conditionId, andCondition, dataMap); + break; + case 6002: + needSend = dateTypeHandler(conditionId, andCondition, dataMap); + break; + case 6025: + needSend = normalHandler(conditionId, andCondition, dataMap); + break; + default: + break; + } + if (!needSend) { + break; + } + } + if (needSend) { + break; + } + } + if (needSend) { + log.info("任务:" + id + "满足" + JSON.toJSONString(admin) + "条件,发送数据到指定Kafka"); + Map result = new HashMap(32); + Map resultMap = new HashMap(32); + resultMap.put("isLast", 1); + resultMap.put("content", "数据满足条件过滤成功"); + result.put("results", JSON.toJSONString(resultMap)); + result.put("status", 1); + result.put("message", ""); + parse.put("result", result); + String message = JSON.toJSONString(parse); + QueueUtil.sendQueue.put(message); + } else { + log.info("任务:" + id + "不满足" + JSON.toJSONString(admin) + "条件,发送数据到指定Kafka"); +// Map result = new HashMap(32); +// Map resultMap = new HashMap(32); +// resultMap.put("isLast", 1); +// resultMap.put("content", "数据不满足条件过滤"); +// resultMap.put("status", 3); +// result.put("results", JSON.toJSONString(resultMap)); +// result.put("message", ""); +// parse.put("result", result); +// String message = JSON.toJSONString(parse); +// QueueUtil.sendQueue.put(message); + } + + } catch (Throwable e) { + log.error("处理程序发生异常:", e); + log.error("任务发生异常:{}", dataJson); + e.printStackTrace(); + Map result = new HashMap(32); + Map resultMap = new HashMap(32); + resultMap.put("isLast", 1); + resultMap.put("content", "数据满足条件过滤成功"); + result.put("results", JSON.toJSONString(resultMap)); + result.put("status", 2); + result.put("message", "未知异常"); + parse.put("result", result); +// String message = JSON.toJSONString(parse); +// try { +// QueueUtil.sendQueue.put(message); +// } catch (InterruptedException ex) { +// ex.printStackTrace(); +// } + } + } else { + log.info("任务队列为空,休眠10秒"); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + } + + private boolean normalHandler(int id, Map conditionMap, Map dataMap) { + boolean condition = false; + String key = (String) conditionMap.get("key"); + String value = (String) conditionMap.get("value"); + Object dataValue = DataUtil.getValue(key, dataMap); + switch (id) { + case 6024: + try { + dataValue.getClass(); + } catch (NullPointerException e) { + condition = true; + } + break; + case 6026: + Map valueMap = (Map) DataUtil.getValue(key, dataMap); + condition = valueMap.containsKey(value); + break; + case 6027: + List valueList = (List) DataUtil.getValue(key, dataMap); + condition = valueList.size() == 0; + break; + case 6028: + List objectList = (List) DataUtil.getValue(key, dataMap); + condition = objectList.size() != 0; + default: + break; + + } + return condition; + } + + private boolean textTypeHandler(int id, Map conditionMap, Map dataMap, boolean caseSensitive) { + boolean condition = false; + String key = (String) conditionMap.get("key"); + String value = (String) conditionMap.get("value"); + String dataValue = (String) DataUtil.getValue(key, dataMap); + switch (id) { + case 6003: + if (caseSensitive) { + condition = dataValue.equals(value); + } else { + dataValue = dataValue.toLowerCase(); + condition = dataValue.equals(value.toLowerCase()); + } + break; + case 6004: + if (caseSensitive) { + condition = dataValue.contains(value); + } else { + dataValue = dataValue.toLowerCase(); + condition = dataValue.contains(value.toLowerCase()); + } + break; + case 6005: + if (caseSensitive) { + condition = dataValue.startsWith(value); + } else { + dataValue = dataValue.toLowerCase(); + condition = dataValue.startsWith(value.toLowerCase()); + } + break; + case 6006: + if (caseSensitive) { + condition = dataValue.endsWith(value); + } else { + dataValue = dataValue.toLowerCase(); + condition = dataValue.endsWith(value.toLowerCase()); + } + break; + case 6007: + Pattern p = Pattern.compile(value); + Matcher m = p.matcher(dataValue); + condition = m.find(); + break; + case 6020: + if (caseSensitive) { + condition = !dataValue.equals(value); + } else { + dataValue = dataValue.toLowerCase(); + condition = !dataValue.equals(value.toLowerCase()); + } + break; + case 6023: + if (caseSensitive) { + condition = !dataValue.contains(value); + } else { + dataValue = dataValue.toLowerCase(); + condition = !dataValue.contains(value.toLowerCase()); + } + break; + + default: + break; + } + return condition; + } + + private boolean numberTypeHandler(int id, Map conditionMap, Map dataMap) { + boolean condition = false; + String key = (String) conditionMap.get("key"); + int value = Integer.parseInt(conditionMap.get("value").toString()); + int dataValue = Integer.parseInt(String.valueOf(DataUtil.getValue(key, dataMap))); + switch (id) { + case 6008: + condition = dataValue > value; + break; + case 6009: + condition = dataValue < value; + break; + case 6010: + condition = dataValue == value; + break; + case 6011: + condition = dataValue >= value; + break; + case 6012: + condition = dataValue <= value; + break; + case 6021: + condition = dataValue != value; + break; + default: + break; + } + return condition; + } + + private boolean dateTypeHandler(int id, Map conditionMap, Map dataMap) { + String format = "yyyy-MM-dd HH:mm:ss"; + boolean condition = false; + String key = (String) conditionMap.get("key"); + long valueTimestamp = 0; + try { + String value = (String) conditionMap.get("value"); + valueTimestamp = convertDateTimeToTimestamp(value, format); + } catch (Exception e) { + e.printStackTrace(); + } + String dataValue = (String) DataUtil.getValue(key, dataMap); + + long dataValueTimestamp = convertDateTimeToTimestamp(dataValue, format); + switch (id) { + case 6013: + condition = dataValueTimestamp > valueTimestamp; + break; + case 6014: + condition = dataValueTimestamp < valueTimestamp; + break; + case 6022: + String theDayBegin = DateUtil.theDayBegin(); + String theDayEnd = DateUtil.theDayEnd(); + condition = convertDateTimeToTimestamp(theDayBegin, format) < dataValueTimestamp && dataValueTimestamp < convertDateTimeToTimestamp(theDayEnd, format); + break; + default: + break; + + } + return condition; + } + + private static long convertDateTimeToTimestamp(String datetimeString, String format) { + SimpleDateFormat sdf = new SimpleDateFormat(format); + try { + Date date = sdf.parse(datetimeString); + return date.getTime(); + } catch (Exception e) { + e.printStackTrace(); + return 0; + } + } + + public static void main(String[] args) { + String json = "{\"charset\":\"UTF-8\",\"iid\":\"57dc5083f49dd088c413e1f28572e83a\",\"length\":21909,\"tmpl_id\":3838,\"type\":\"newslist\",\"version\":\"6\",\"news_id\":\"57dc5083f49dd088c413e1f28572e83a\",\"url\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=0&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\",\"nextpage\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=20&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\",\"host\":\"172.18.1.182\",\"category\":1,\"items\":[{\"link\":{\"iid\":\"93b44cdcad298b82cecd5e0c09cb078d\",\"link\":\"https://new.qq.com/rain/a/20231031A012QT00\",\"linktype\":\"newscontent\",\"rawlink\":\"https://new.qq.com/rain/a/20231031A012QT00\"},\"posttime\":\"2023-10-31 08:02:07\",\"title\":\"\u200B中学周末校内托管服务,确定符合“双减”精神?\"}],\"tasks\":[{\"iid\":\"93b44cdcad298b82cecd5e0c09cb078d\",\"link\":\"https://new.qq.com/rain/a/20231031A012QT00\",\"linktype\":\"newscomment\",\"rawlink\":\"rain/a/20231031A012QT00\"},{\"iid\":\"f3d8ce4383320a7dc7c6270c74297272\",\"link\":\"https://new.qq.com/rain/a/20231030A09KEE00\",\"linktype\":\"newscontent\",\"rawlink\":\"https://new.qq.com/rain/a/20231030A09KEE00\"},{\"iid\":\"6a275c516abc33e2b216a08fb3b8668e\",\"link\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=20&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\",\"linktype\":\"newslist\",\"rawlink\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=20&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\"}],\"cid\":\"NtengxunNews\"}"; + JSONObject jsonObject = JSON.parseObject(json); + System.out.println(JSONPath.eval(jsonObject, "$$$$.['tasks'][0]['iissssd']")); + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/service/SendService.java b/src/main/java/com/bfd/crawl/datafilter/service/SendService.java new file mode 100644 index 0000000..21f93ef --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/service/SendService.java @@ -0,0 +1,47 @@ +package com.bfd.crawl.datafilter.service; + + +import com.bfd.crawl.datafilter.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +/** + * @author:jinming + * @className:SendService + * @version:1.0 + * @description: + * @Date:2023/7/31 17:53 + */ +@Slf4j +@Service +public class SendService { + @Value("${send.topic}") + private String topic; + @Autowired + private KafkaTemplate kafkaTemplate; + + @Async("sendExecutor") + void sendToKafka() { + while (true) { + if (QueueUtil.sendQueue.size() > 0) { + try { + String message = QueueUtil.sendQueue.take(); + kafkaTemplate.send(topic,message); + } catch (Exception e) { + e.printStackTrace(); + } + }else { + log.info("任务队列为空,休眠3秒"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } +} diff --git a/src/main/java/com/bfd/crawl/datafilter/service/StartServcie.java b/src/main/java/com/bfd/crawl/datafilter/service/StartServcie.java new file mode 100644 index 0000000..4f54960 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/service/StartServcie.java @@ -0,0 +1,79 @@ +package com.bfd.crawl.datafilter.service; + +import com.bfd.crawl.datafilter.util.QueueUtil; +import com.bfd.util.PauseTool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author:jinming + * @className:StartServcie + * @version:1.0 + * @description: + * @Date:2023/7/31 17:14 + */ +@Service +@Slf4j +@Order(value = 1) +public class StartServcie implements ApplicationRunner { + @Value("${thread.handler}") + private int handlerNumber; + + @Value("${thread.send}") + private int sendNumber; + @Autowired + private HandlerService handlerService; + @Autowired + private SendService sendService; +// @Value("${zookeeper.connection-string}") +// private String connectionString; +// @Value("${zookeeper.publish-node}") +// private String nodePath; +// @Resource +// private StringRedisTemplate stringRedisTemplate; + + @Override + public void run(ApplicationArguments args) throws Exception { +// PauseTool pauseTool = new PauseTool(); +// pauseTool.initializeRedisCache(stringRedisTemplate); +// pauseTool.setupZookeeperListener(connectionString, nodePath); + for (int i = 0; i < handlerNumber; i++) { + log.info("处理服务线程" + i + "已启动 "); + handlerService.run(); + } + for (int i = 0; i < sendNumber; i++) { + log.info("发送服务线程" + i + "已启动 "); + sendService.sendToKafka(); + } + // 创建一个匿名内部类实现了Runnable接口 + Runnable myRunnable = new Runnable() { + @Override + public void run() { + // 在这里定义线程要执行的任务 + while (true) { + log.info("任务队列长度为" + QueueUtil.taskQueue.size()); + log.info("发送队列长度为" + QueueUtil.taskQueue.size()); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }; + // 创建一个新的线程,并将Runnable对象传递给Thread构造函数 + Thread myThread = new Thread(myRunnable); + // 启动线程 + myThread.start(); + + + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/util/DataUtil.java b/src/main/java/com/bfd/crawl/datafilter/util/DataUtil.java new file mode 100644 index 0000000..f801abc --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/util/DataUtil.java @@ -0,0 +1,61 @@ +package com.bfd.crawl.datafilter.util; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSONPath; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author:jinming + * @className:DataUtil + * @version:1.0 + * @description: 获取dataValue的值 + * @Date:2023/11/1 9:54 + */ +@Slf4j +public class DataUtil { + /** + * @param key 传入的key + * @param dataMap 数据map + * @return 根据传入的参数进行判断解析,返回正确的dataValue + */ + public static Object getValue(String key, Map dataMap) { + try { + //公式为空直接就返回 + if (!StringUtil.hasValue(key)) { + return ""; + } + Object dataValue; + String isJson = "#json#"; + if (key.contains(isJson)) { + //进行第一次拆分,获取#json#前面的部分 + String[] keySplit = key.split(isJson); + String firstDataKey = keySplit[0]; + String[] firstDataKeySplit = firstDataKey.split(":"); + //取出前半部分对应的JSON数据并转换为JSONObject + String dataJson = (String) dataMap.get(firstDataKeySplit[0]); + JSONObject dataJsonObject = JSON.parseObject(dataJson); + //根据key的后半部分取出对应JSONObject中的值 + String firstDataKeyJson = (String) JSONPath.eval(dataJsonObject, firstDataKeySplit[1]); + String secDataKey = keySplit[1]; + JSONObject firstDataJsonObject = JSON.parseObject(firstDataKeyJson); + dataValue = JSONPath.eval(firstDataJsonObject, secDataKey); + return dataValue; + } + String[] keySplit = key.split(":"); + String jsonPath = keySplit[1]; + String dataJson = (String) dataMap.get(keySplit[0]); + JSONObject dataJsonObject = JSON.parseObject(dataJson); + dataValue = JSONPath.eval(dataJsonObject, jsonPath); + return dataValue; + } catch (Exception e) { + // TODO: handle exception + log.error("jsonpath公式取值异常,", e); + return ""; + } + + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/util/DateUtil.java b/src/main/java/com/bfd/crawl/datafilter/util/DateUtil.java new file mode 100644 index 0000000..257e1c8 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/util/DateUtil.java @@ -0,0 +1,72 @@ +package com.bfd.crawl.datafilter.util; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * @author jinming + * @version 1.0 + * @className DateUtil + * @Date 2022/7/29 15:49 + */ +public class DateUtil { + public static String theDayBegin() { + return new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + " 00:00:00"; + } + + public static String theDayEnd() { + return new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + " 23:59:59"; + } + + public static String theTaskBegin() { + String theDayBegin = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + " 00:00:00"; + try { + Date parse = new SimpleDateFormat("yyyy-MM-dd").parse(theDayBegin); + return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(parse.getTime() - 7200000)).toString(); + } catch (ParseException e) { + e.printStackTrace(); + } + return ""; + + } + + + public static String theMonthBegin() { + String theDayBegin = new SimpleDateFormat("yyyy-MM").format(new Date()) + "-01 00:00:00"; + try { + Date parse = new SimpleDateFormat("yyyy-MM-dd").parse(theDayBegin); + return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(parse.getTime())); + } catch (ParseException e) { + e.printStackTrace(); + } + return ""; + + } + + public static long timeToLong(String time) { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date parse = null; + try { + parse = simpleDateFormat.parse(time); + } catch (ParseException e) { + e.printStackTrace(); + } + return parse.getTime(); + + } + + public static String fomateTime(long time) { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String parse = ""; + + parse = simpleDateFormat.format(time); + + return parse; + + } + + public static void main(String[] args) { + System.out.println(DateUtil.theTaskBegin()); + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/util/QueueUtil.java b/src/main/java/com/bfd/crawl/datafilter/util/QueueUtil.java new file mode 100644 index 0000000..8e4b8f2 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/util/QueueUtil.java @@ -0,0 +1,16 @@ +package com.bfd.crawl.datafilter.util; + +import java.util.concurrent.LinkedBlockingDeque; + +/** + * @author:jinming + * @className:QueueUtil + * @version:1.0 + * @description: + * @Date:2023/7/13 15:00 + */ +public class QueueUtil { + + public static LinkedBlockingDeque taskQueue = new LinkedBlockingDeque(); + public static LinkedBlockingDeque sendQueue = new LinkedBlockingDeque(); +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/datafilter/util/StringUtil.java b/src/main/java/com/bfd/crawl/datafilter/util/StringUtil.java new file mode 100644 index 0000000..4cea4b4 --- /dev/null +++ b/src/main/java/com/bfd/crawl/datafilter/util/StringUtil.java @@ -0,0 +1,94 @@ +package com.bfd.crawl.datafilter.util; + + +import lombok.extern.slf4j.Slf4j; + +import java.security.MessageDigest; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author jinming + * @version 1.0 + * @className StringUtile + * @Date 2022/1/21 11:46 + */ +@Slf4j +public class StringUtil { + public static boolean hasValue(String str) { + return str != null && !"".equals(str.trim()); + } + + public static String getRegexGroup(String regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Pattern p = Pattern.compile(regex); + Matcher m = p.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + } + + return resultStr; + } + + public static Set getEmailAddress(String message) { + Set emailList = new HashSet<>(); + Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+"); + Matcher m = pattern.matcher(message); + while (m.find()) { + emailList.add(m.group(0)); + } + return emailList; + } + public static String getMd5(String string) { + try { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] bs = md5.digest(string.getBytes("UTF-8")); + StringBuilder sb = new StringBuilder(40); + for (byte x : bs) { + if ((x & 0xff) >> 4 == 0) { + sb.append("0").append(Integer.toHexString(x & 0xff)); + } else { + sb.append(Integer.toHexString(x & 0xff)); + } + } + return sb.toString(); + } catch (Exception e) { + //LOG.error("获取md5异常", e); + return "nceaform" + System.currentTimeMillis(); + } + } + + public static String removeAllHtmlTags(String str) { + return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : ""; + } + + public static String getRegexGroup(Pattern regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Matcher m = regex.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + log.error(regex + " parser error!"); + } + + return resultStr; + } + + public static String getStrByPattern(String str, String regex) { + Pattern pattern = Pattern.compile(regex); + Matcher m = pattern.matcher(str); + return m.find() ? m.group(0) : ""; + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..529dc14 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,64 @@ +server: + port: 7088 +spring: + application: + name: 过滤器 + boot: + admin: + client: + health: + timeout: 10s + url: http://172.16.12.55:8001 + instance: + service-base-url: http://172.16.12.56:7088 + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + username: crawl + password: crawl123 + url: jdbc:mysql://172.26.11.110:3306/kyyzgpt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false + + redis: + host: 172.24.12.126 + port: 6379 + timeout: 10000 + database: 7 + jedis: + pool: + max-active: 8 # 连接池最大连接数(使用负值表示没有限制) + max-wait: 800 # 连接池最大阻塞等待时间(使用负值表示没有限制) + max-idle: 8 # 连接池中的最大空闲连接 + min-idle: 2 # 连接池中的最小空闲连接 + kafka: + bootstrap-servers: 172.16.12.55:9092,172.16.12.56:9092,172.16.12.57:9092 + producer: + retries: 3 + acks: all + batch-size: 4096 + buffer-memory: 102476800 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + jpa: + database-platform: org.hibernate.dialect.MySQL8Dialect + hibernate: + naming: + physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl + +logging: + file: + path: ./logs + +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: always + +send: + topic: analyze + +thread: + handler: 50 + send: 20 diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..7aef64c --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,36 @@ + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + + + + + true + + ${logging.level} + + + ${logging.file.path}/data-filter.log + + + ${logging.file.path}/data-filter.log.%d{yyyy-MM-dd} + 3 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + UTF-8 + + + + + + + + diff --git a/src/test/java/com/bfd/crawl/datafilter/DataFilterApplicationTests.java b/src/test/java/com/bfd/crawl/datafilter/DataFilterApplicationTests.java new file mode 100644 index 0000000..e4dbad4 --- /dev/null +++ b/src/test/java/com/bfd/crawl/datafilter/DataFilterApplicationTests.java @@ -0,0 +1,13 @@ +package com.bfd.crawl.datafilter; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class DataFilterApplicationTests { + + @Test + void contextLoads() { + } + +}