Browse Source

过滤器应用

master
55007 6 months ago
commit
6621b5f0bf
  1. 34
      .gitignore
  2. 1
      README.md
  3. 130
      pom.xml
  4. 24
      src/main/java/com/bfd/crawl/datafilter/DataFilterApplication.java
  5. 36
      src/main/java/com/bfd/crawl/datafilter/bean/Constants.java
  6. 59
      src/main/java/com/bfd/crawl/datafilter/bean/ResponsePo.java
  7. 48
      src/main/java/com/bfd/crawl/datafilter/config/AsyncThreadConfiguration.java
  8. 39
      src/main/java/com/bfd/crawl/datafilter/controller/DataFilterController.java
  9. 14
      src/main/java/com/bfd/crawl/datafilter/dao/FilterTypeDao.java
  10. 49
      src/main/java/com/bfd/crawl/datafilter/entity/FilterType.java
  11. 32
      src/main/java/com/bfd/crawl/datafilter/enums/ResponseCode.java
  12. 32
      src/main/java/com/bfd/crawl/datafilter/exception/GlobalExceptionHandler.java
  13. 331
      src/main/java/com/bfd/crawl/datafilter/service/HandlerService.java
  14. 47
      src/main/java/com/bfd/crawl/datafilter/service/SendService.java
  15. 79
      src/main/java/com/bfd/crawl/datafilter/service/StartServcie.java
  16. 61
      src/main/java/com/bfd/crawl/datafilter/util/DataUtil.java
  17. 72
      src/main/java/com/bfd/crawl/datafilter/util/DateUtil.java
  18. 16
      src/main/java/com/bfd/crawl/datafilter/util/QueueUtil.java
  19. 94
      src/main/java/com/bfd/crawl/datafilter/util/StringUtil.java
  20. 64
      src/main/resources/application.yml
  21. 36
      src/main/resources/logback-spring.xml
  22. 13
      src/test/java/com/bfd/crawl/datafilter/DataFilterApplicationTests.java

34
.gitignore

@ -0,0 +1,34 @@
HELP.md
target/
logs/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

1
README.md

@ -0,0 +1 @@
过滤器

130
pom.xml

@ -0,0 +1,130 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bfd.crawl</groupId>
<artifactId>dataFilter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>dataFilter</name>
<description>dataFilter</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.2.4.RELEASE</spring-boot.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/de.codecentric/spring-boot-admin-starter-client -->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-client</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--redis-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--JPA-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!--JSON-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.17</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bfd.util</groupId>
<artifactId>pauseTool</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>dataFilter-0.0.1-SNAPSHOT</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<includeSystemScope>true</includeSystemScope>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

24
src/main/java/com/bfd/crawl/datafilter/DataFilterApplication.java

@ -0,0 +1,24 @@
package com.bfd.crawl.datafilter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAsync
@EnableScheduling
/**
* @author:jinming
* @className:DataFilterApplication
* @version:1.0
* @description:
* @Date:2023/7/31 17:53
*/
public class DataFilterApplication {
public static void main(String[] args) {
SpringApplication.run(DataFilterApplication.class, args);
}
}

36
src/main/java/com/bfd/crawl/datafilter/bean/Constants.java

@ -0,0 +1,36 @@
package com.bfd.crawl.datafilter.bean;
/**
* @author:jinming
* @className:Constants
* @version:1.0
* @description:
* @Date:2023/7/14 10:41
*/
public class Constants {
/**
* 查询数据时间
*/
public static String TIME = "time";
/**
* 查询数据开始时间
*/
public static String START_TIME = "startTime";
/**
* 查询数据结束时间
*/
public static String END_TIME = "endTime";
/**
* 查询排除词
*/
public static String EXCLUSION_WORDS = "exclusionWords";
/**
* 查询包含命中关键词
*/
public static String INCLUD_WORDS = "includWords";
}

59
src/main/java/com/bfd/crawl/datafilter/bean/ResponsePo.java

@ -0,0 +1,59 @@
package com.bfd.crawl.datafilter.bean;
import com.bfd.crawl.datafilter.enums.ResponseCode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author:jinming
* @className:ResponsePo
* @version:1.0
* @description:
* @Date:2023/4/3 17:23
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResponsePo {
/**
* 响应码
*/
private int code;
/**
* 正常放 返回数据 的JSON串
*/
private Object data;
/**
* 提示消息
*/
private String message;
public static ResponsePo success() {
return setStatus(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getMessage());
}
public static ResponsePo error() {
return setStatus(ResponseCode.FAILURE.getCode(), ResponseCode.FAILURE.getMessage());
}
public static ResponsePo setStatus(int code, String message) {
ResponsePo resultBean = new ResponsePo();
resultBean.code = code;
resultBean.message = message;
return resultBean;
}
public ResponsePo(int code, String message) {
this.code = code;
this.message = message;
this.data = data;
}
public ResponsePo(ResponseCode responseCode){
this.code = responseCode.getCode();
this.message = responseCode.getMessage();
this.data = data;
}
}

48
src/main/java/com/bfd/crawl/datafilter/config/AsyncThreadConfiguration.java

@ -0,0 +1,48 @@
package com.bfd.crawl.datafilter.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @author jinming
* @version 1.0
* @className AsyncThreadConfiguration
* @Date 2022/2/17 18:37
*/
@Configuration
@EnableAsync
public class AsyncThreadConfiguration {
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(500);
// 并发线程的数量限制为2
executor.setMaxPoolSize(500);
// 线程队列
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("dataFilter-");
executor.initialize();
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
@Bean(name = "sendExecutor")
public Executor sendExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(500);
// 并发线程的数量限制为2
executor.setMaxPoolSize(500);
// 线程队列
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("sendData-");
executor.initialize();
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}

39
src/main/java/com/bfd/crawl/datafilter/controller/DataFilterController.java

@ -0,0 +1,39 @@
package com.bfd.crawl.datafilter.controller;
import com.alibaba.fastjson.JSON;
import com.bfd.crawl.datafilter.bean.ResponsePo;
import com.bfd.crawl.datafilter.enums.ResponseCode;
import com.bfd.crawl.datafilter.util.QueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* @author:jinming
* @className:DataFilterController
* @version:1.0
* @description:
* @Date:2023/7/26 11:21
*/
@RestController
@RequestMapping("/handlerdata")
@Slf4j
public class DataFilterController {
@PostMapping("/filter")
public ResponsePo documentFeedback(@RequestBody String dataJson) {
ResponsePo responsePo = ResponsePo.success();
log.info("新增任务:" + dataJson);
try {
QueueUtil.taskQueue.put(dataJson);
} catch (InterruptedException e) {
e.printStackTrace();
}
return responsePo;
}
}

14
src/main/java/com/bfd/crawl/datafilter/dao/FilterTypeDao.java

@ -0,0 +1,14 @@
package com.bfd.crawl.datafilter.dao;
import com.bfd.crawl.datafilter.entity.FilterType;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* @author jinming
* @version 1.0
* @className FilterTypeDao
* @Date 2023/7/26 16:28
*/
public interface FilterTypeDao extends JpaRepository<FilterType, Integer> {
FilterType getFilterTypeById(int id);
}

49
src/main/java/com/bfd/crawl/datafilter/entity/FilterType.java

@ -0,0 +1,49 @@
package com.bfd.crawl.datafilter.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
/**
* @author:jinming
* @className:FilterType
* @version:1.0
* @description:
* @Date:2023/7/26 10:58
*/
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "filter_type")
public class FilterType {
/**
* 自增主键ID
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private Integer id;
/**
* 类型66运算方式67运算条件
*/
@Column(name = "type")
private String type;
/**
* 描述
*/
@Column(name = "describe")
private String describe;
/**
* 上级id
*/
@Column(name = "parent_id")
private Integer parentId;
/**
* 等级
*/
@Column(name = "level")
private String level;
}

32
src/main/java/com/bfd/crawl/datafilter/enums/ResponseCode.java

@ -0,0 +1,32 @@
package com.bfd.crawl.datafilter.enums;
/**
* @author:jinming
* @className:ResponseCodeEnum
* @version:1.0
* @description:响应结果码枚举类
* @Date:2023/2/28 11:40
*/
public enum ResponseCode {
//返回结果码枚举类
SUCCESS(200, "操作成功"),
FAILURE(400, "参数错误"),
INTERNAL_SERVER_ERROR(500, "服务器内部错误"),
TYPE_NOT_SUPPORT(601,"文件类型不支持");
private int code;
private String message;
ResponseCode(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
}

32
src/main/java/com/bfd/crawl/datafilter/exception/GlobalExceptionHandler.java

@ -0,0 +1,32 @@
package com.bfd.crawl.datafilter.exception;
import com.bfd.crawl.datafilter.bean.ResponsePo;
import com.bfd.crawl.datafilter.enums.ResponseCode;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;
/**
* @author:jinming
* @className:GlobalExceptionHandler
* @version:1.0
* @description: 异常处理类
* @Date:2023/2/28 16:29
*/
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(value = {IllegalArgumentException.class})
@ResponseStatus(HttpStatus.BAD_REQUEST)
public ResponsePo handleBadRequest(Exception ex) {
return new ResponsePo(ResponseCode.FAILURE.getCode(), ex.getMessage());
}
@ExceptionHandler(value = {Exception.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public ResponsePo handleException(Exception ex) {
return new ResponsePo(ResponseCode.INTERNAL_SERVER_ERROR.getCode(), ex.getMessage());
}
}

331
src/main/java/com/bfd/crawl/datafilter/service/HandlerService.java

@ -0,0 +1,331 @@
package com.bfd.crawl.datafilter.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.bfd.crawl.datafilter.dao.FilterTypeDao;
import com.bfd.crawl.datafilter.entity.FilterType;
import com.bfd.crawl.datafilter.util.DataUtil;
import com.bfd.crawl.datafilter.util.DateUtil;
import com.bfd.crawl.datafilter.util.QueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author:jinming
* @className:HandlerService
* @version:1.0
* @description:
* @Date:2023/7/26 11:29
*/
@Service
@Slf4j
public class HandlerService {
@Autowired
private FilterTypeDao filterTypeDao;
@Async("asyncExecutor")
public void run() {
while (true) {
if (QueueUtil.taskQueue.size() > 0) {
String dataJson = null;
try {
dataJson = QueueUtil.taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
Map parse = null;
try {
parse = (Map) JSON.parse(dataJson);
} catch (Exception e) {
e.printStackTrace();
}
try {
Map data = (Map) parse.get("data");
int id = (int) parse.get("id");
log.info("任务:" + id + "已开始处理");
boolean needSend = false;
Map admin = (Map) parse.get("input");
Map dataMap = (Map) parse.get("data");
log.info("任务:" + id + "的判断条件为:" + JSON.toJSONString(admin));
boolean caseSensitive = true;
try {
caseSensitive = (boolean) admin.get("caseSensitive");
} catch (Exception e) {
}
List<List<Map>> conditions = (List<List<Map>>) admin.get("or");
for (List<Map> condition : conditions) {
for (Map andCondition : condition) {
int conditionId = (int) andCondition.get("id");
FilterType filterType = filterTypeDao.getFilterTypeById(conditionId);
int typeId = filterType.getParentId();
switch (typeId) {
case 6000:
needSend = textTypeHandler(conditionId, andCondition, dataMap, caseSensitive);
break;
case 6001:
needSend = numberTypeHandler(conditionId, andCondition, dataMap);
break;
case 6002:
needSend = dateTypeHandler(conditionId, andCondition, dataMap);
break;
case 6025:
needSend = normalHandler(conditionId, andCondition, dataMap);
break;
default:
break;
}
if (!needSend) {
break;
}
}
if (needSend) {
break;
}
}
if (needSend) {
log.info("任务:" + id + "满足" + JSON.toJSONString(admin) + "条件,发送数据到指定Kafka");
Map result = new HashMap(32);
Map resultMap = new HashMap(32);
resultMap.put("isLast", 1);
resultMap.put("content", "数据满足条件过滤成功");
result.put("results", JSON.toJSONString(resultMap));
result.put("status", 1);
result.put("message", "");
parse.put("result", result);
String message = JSON.toJSONString(parse);
QueueUtil.sendQueue.put(message);
} else {
log.info("任务:" + id + "不满足" + JSON.toJSONString(admin) + "条件,发送数据到指定Kafka");
// Map result = new HashMap(32);
// Map resultMap = new HashMap(32);
// resultMap.put("isLast", 1);
// resultMap.put("content", "数据不满足条件过滤");
// resultMap.put("status", 3);
// result.put("results", JSON.toJSONString(resultMap));
// result.put("message", "");
// parse.put("result", result);
// String message = JSON.toJSONString(parse);
// QueueUtil.sendQueue.put(message);
}
} catch (Throwable e) {
log.error("处理程序发生异常:", e);
log.error("任务发生异常:{}", dataJson);
e.printStackTrace();
Map result = new HashMap(32);
Map resultMap = new HashMap(32);
resultMap.put("isLast", 1);
resultMap.put("content", "数据满足条件过滤成功");
result.put("results", JSON.toJSONString(resultMap));
result.put("status", 2);
result.put("message", "未知异常");
parse.put("result", result);
// String message = JSON.toJSONString(parse);
// try {
// QueueUtil.sendQueue.put(message);
// } catch (InterruptedException ex) {
// ex.printStackTrace();
// }
}
} else {
log.info("任务队列为空,休眠10秒");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private boolean normalHandler(int id, Map conditionMap, Map dataMap) {
boolean condition = false;
String key = (String) conditionMap.get("key");
String value = (String) conditionMap.get("value");
Object dataValue = DataUtil.getValue(key, dataMap);
switch (id) {
case 6024:
try {
dataValue.getClass();
} catch (NullPointerException e) {
condition = true;
}
break;
case 6026:
Map valueMap = (Map) DataUtil.getValue(key, dataMap);
condition = valueMap.containsKey(value);
break;
case 6027:
List<Object> valueList = (List<Object>) DataUtil.getValue(key, dataMap);
condition = valueList.size() == 0;
break;
case 6028:
List<Object> objectList = (List<Object>) DataUtil.getValue(key, dataMap);
condition = objectList.size() != 0;
default:
break;
}
return condition;
}
private boolean textTypeHandler(int id, Map conditionMap, Map dataMap, boolean caseSensitive) {
boolean condition = false;
String key = (String) conditionMap.get("key");
String value = (String) conditionMap.get("value");
String dataValue = (String) DataUtil.getValue(key, dataMap);
switch (id) {
case 6003:
if (caseSensitive) {
condition = dataValue.equals(value);
} else {
dataValue = dataValue.toLowerCase();
condition = dataValue.equals(value.toLowerCase());
}
break;
case 6004:
if (caseSensitive) {
condition = dataValue.contains(value);
} else {
dataValue = dataValue.toLowerCase();
condition = dataValue.contains(value.toLowerCase());
}
break;
case 6005:
if (caseSensitive) {
condition = dataValue.startsWith(value);
} else {
dataValue = dataValue.toLowerCase();
condition = dataValue.startsWith(value.toLowerCase());
}
break;
case 6006:
if (caseSensitive) {
condition = dataValue.endsWith(value);
} else {
dataValue = dataValue.toLowerCase();
condition = dataValue.endsWith(value.toLowerCase());
}
break;
case 6007:
Pattern p = Pattern.compile(value);
Matcher m = p.matcher(dataValue);
condition = m.find();
break;
case 6020:
if (caseSensitive) {
condition = !dataValue.equals(value);
} else {
dataValue = dataValue.toLowerCase();
condition = !dataValue.equals(value.toLowerCase());
}
break;
case 6023:
if (caseSensitive) {
condition = !dataValue.contains(value);
} else {
dataValue = dataValue.toLowerCase();
condition = !dataValue.contains(value.toLowerCase());
}
break;
default:
break;
}
return condition;
}
private boolean numberTypeHandler(int id, Map conditionMap, Map dataMap) {
boolean condition = false;
String key = (String) conditionMap.get("key");
int value = Integer.parseInt(conditionMap.get("value").toString());
int dataValue = Integer.parseInt(String.valueOf(DataUtil.getValue(key, dataMap)));
switch (id) {
case 6008:
condition = dataValue > value;
break;
case 6009:
condition = dataValue < value;
break;
case 6010:
condition = dataValue == value;
break;
case 6011:
condition = dataValue >= value;
break;
case 6012:
condition = dataValue <= value;
break;
case 6021:
condition = dataValue != value;
break;
default:
break;
}
return condition;
}
private boolean dateTypeHandler(int id, Map conditionMap, Map dataMap) {
String format = "yyyy-MM-dd HH:mm:ss";
boolean condition = false;
String key = (String) conditionMap.get("key");
long valueTimestamp = 0;
try {
String value = (String) conditionMap.get("value");
valueTimestamp = convertDateTimeToTimestamp(value, format);
} catch (Exception e) {
e.printStackTrace();
}
String dataValue = (String) DataUtil.getValue(key, dataMap);
long dataValueTimestamp = convertDateTimeToTimestamp(dataValue, format);
switch (id) {
case 6013:
condition = dataValueTimestamp > valueTimestamp;
break;
case 6014:
condition = dataValueTimestamp < valueTimestamp;
break;
case 6022:
String theDayBegin = DateUtil.theDayBegin();
String theDayEnd = DateUtil.theDayEnd();
condition = convertDateTimeToTimestamp(theDayBegin, format) < dataValueTimestamp && dataValueTimestamp < convertDateTimeToTimestamp(theDayEnd, format);
break;
default:
break;
}
return condition;
}
private static long convertDateTimeToTimestamp(String datetimeString, String format) {
SimpleDateFormat sdf = new SimpleDateFormat(format);
try {
Date date = sdf.parse(datetimeString);
return date.getTime();
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public static void main(String[] args) {
String json = "{\"charset\":\"UTF-8\",\"iid\":\"57dc5083f49dd088c413e1f28572e83a\",\"length\":21909,\"tmpl_id\":3838,\"type\":\"newslist\",\"version\":\"6\",\"news_id\":\"57dc5083f49dd088c413e1f28572e83a\",\"url\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=0&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\",\"nextpage\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=20&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\",\"host\":\"172.18.1.182\",\"category\":1,\"items\":[{\"link\":{\"iid\":\"93b44cdcad298b82cecd5e0c09cb078d\",\"link\":\"https://new.qq.com/rain/a/20231031A012QT00\",\"linktype\":\"newscontent\",\"rawlink\":\"https://new.qq.com/rain/a/20231031A012QT00\"},\"posttime\":\"2023-10-31 08:02:07\",\"title\":\"\u200B中学周末校内托管服务,确定符合“双减”精神?\"}],\"tasks\":[{\"iid\":\"93b44cdcad298b82cecd5e0c09cb078d\",\"link\":\"https://new.qq.com/rain/a/20231031A012QT00\",\"linktype\":\"newscomment\",\"rawlink\":\"rain/a/20231031A012QT00\"},{\"iid\":\"f3d8ce4383320a7dc7c6270c74297272\",\"link\":\"https://new.qq.com/rain/a/20231030A09KEE00\",\"linktype\":\"newscontent\",\"rawlink\":\"https://new.qq.com/rain/a/20231030A09KEE00\"},{\"iid\":\"6a275c516abc33e2b216a08fb3b8668e\",\"link\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=20&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\",\"linktype\":\"newslist\",\"rawlink\":\"https://i.news.qq.com/trpc.qqnews_web.kv_srv.kv_srv_http_proxy/list?sub_srv_id=edu&srv_id=pc&offset=20&limit=20&strategy=1&ext=%7B%22pool%22%3A%5B%22top%22%2C%22hot%22%5D%2C%22is_filter%22%3A10%2C%22check_type%22%3Atrue%7D\"}],\"cid\":\"NtengxunNews\"}";
JSONObject jsonObject = JSON.parseObject(json);
System.out.println(JSONPath.eval(jsonObject, "$$$$.['tasks'][0]['iissssd']"));
}
}

47
src/main/java/com/bfd/crawl/datafilter/service/SendService.java

@ -0,0 +1,47 @@
package com.bfd.crawl.datafilter.service;
import com.bfd.crawl.datafilter.util.QueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @author:jinming
* @className:SendService
* @version:1.0
* @description:
* @Date:2023/7/31 17:53
*/
@Slf4j
@Service
public class SendService {
@Value("${send.topic}")
private String topic;
@Autowired
private KafkaTemplate kafkaTemplate;
@Async("sendExecutor")
void sendToKafka() {
while (true) {
if (QueueUtil.sendQueue.size() > 0) {
try {
String message = QueueUtil.sendQueue.take();
kafkaTemplate.send(topic,message);
} catch (Exception e) {
e.printStackTrace();
}
}else {
log.info("任务队列为空,休眠3秒");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

79
src/main/java/com/bfd/crawl/datafilter/service/StartServcie.java

@ -0,0 +1,79 @@
package com.bfd.crawl.datafilter.service;
import com.bfd.crawl.datafilter.util.QueueUtil;
import com.bfd.util.PauseTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author:jinming
* @className:StartServcie
* @version:1.0
* @description:
* @Date:2023/7/31 17:14
*/
@Service
@Slf4j
@Order(value = 1)
public class StartServcie implements ApplicationRunner {
@Value("${thread.handler}")
private int handlerNumber;
@Value("${thread.send}")
private int sendNumber;
@Autowired
private HandlerService handlerService;
@Autowired
private SendService sendService;
// @Value("${zookeeper.connection-string}")
// private String connectionString;
// @Value("${zookeeper.publish-node}")
// private String nodePath;
// @Resource
// private StringRedisTemplate stringRedisTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
// PauseTool pauseTool = new PauseTool();
// pauseTool.initializeRedisCache(stringRedisTemplate);
// pauseTool.setupZookeeperListener(connectionString, nodePath);
for (int i = 0; i < handlerNumber; i++) {
log.info("处理服务线程" + i + "已启动 ");
handlerService.run();
}
for (int i = 0; i < sendNumber; i++) {
log.info("发送服务线程" + i + "已启动 ");
sendService.sendToKafka();
}
// 创建一个匿名内部类实现了Runnable接口
Runnable myRunnable = new Runnable() {
@Override
public void run() {
// 在这里定义线程要执行的任务
while (true) {
log.info("任务队列长度为" + QueueUtil.taskQueue.size());
log.info("发送队列长度为" + QueueUtil.taskQueue.size());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
// 创建一个新的线程并将Runnable对象传递给Thread构造函数
Thread myThread = new Thread(myRunnable);
// 启动线程
myThread.start();
}
}

61
src/main/java/com/bfd/crawl/datafilter/util/DataUtil.java

@ -0,0 +1,61 @@
package com.bfd.crawl.datafilter.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* @author:jinming
* @className:DataUtil
* @version:1.0
* @description: 获取dataValue的值
* @Date:2023/11/1 9:54
*/
@Slf4j
public class DataUtil {
/**
* @param key 传入的key
* @param dataMap 数据map
* @return 根据传入的参数进行判断解析返回正确的dataValue
*/
public static Object getValue(String key, Map dataMap) {
try {
//公式为空直接就返回
if (!StringUtil.hasValue(key)) {
return "";
}
Object dataValue;
String isJson = "#json#";
if (key.contains(isJson)) {
//进行第一次拆分获取#json#前面的部分
String[] keySplit = key.split(isJson);
String firstDataKey = keySplit[0];
String[] firstDataKeySplit = firstDataKey.split(":");
//取出前半部分对应的JSON数据并转换为JSONObject
String dataJson = (String) dataMap.get(firstDataKeySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
//根据key的后半部分取出对应JSONObject中的值
String firstDataKeyJson = (String) JSONPath.eval(dataJsonObject, firstDataKeySplit[1]);
String secDataKey = keySplit[1];
JSONObject firstDataJsonObject = JSON.parseObject(firstDataKeyJson);
dataValue = JSONPath.eval(firstDataJsonObject, secDataKey);
return dataValue;
}
String[] keySplit = key.split(":");
String jsonPath = keySplit[1];
String dataJson = (String) dataMap.get(keySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
dataValue = JSONPath.eval(dataJsonObject, jsonPath);
return dataValue;
} catch (Exception e) {
// TODO: handle exception
log.error("jsonpath公式取值异常,", e);
return "";
}
}
}

72
src/main/java/com/bfd/crawl/datafilter/util/DateUtil.java

@ -0,0 +1,72 @@
package com.bfd.crawl.datafilter.util;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author jinming
* @version 1.0
* @className DateUtil
* @Date 2022/7/29 15:49
*/
public class DateUtil {
public static String theDayBegin() {
return new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + " 00:00:00";
}
public static String theDayEnd() {
return new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + " 23:59:59";
}
public static String theTaskBegin() {
String theDayBegin = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + " 00:00:00";
try {
Date parse = new SimpleDateFormat("yyyy-MM-dd").parse(theDayBegin);
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(parse.getTime() - 7200000)).toString();
} catch (ParseException e) {
e.printStackTrace();
}
return "";
}
public static String theMonthBegin() {
String theDayBegin = new SimpleDateFormat("yyyy-MM").format(new Date()) + "-01 00:00:00";
try {
Date parse = new SimpleDateFormat("yyyy-MM-dd").parse(theDayBegin);
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(parse.getTime()));
} catch (ParseException e) {
e.printStackTrace();
}
return "";
}
public static long timeToLong(String time) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date parse = null;
try {
parse = simpleDateFormat.parse(time);
} catch (ParseException e) {
e.printStackTrace();
}
return parse.getTime();
}
public static String fomateTime(long time) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String parse = "";
parse = simpleDateFormat.format(time);
return parse;
}
public static void main(String[] args) {
System.out.println(DateUtil.theTaskBegin());
}
}

16
src/main/java/com/bfd/crawl/datafilter/util/QueueUtil.java

@ -0,0 +1,16 @@
package com.bfd.crawl.datafilter.util;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author:jinming
* @className:QueueUtil
* @version:1.0
* @description:
* @Date:2023/7/13 15:00
*/
public class QueueUtil {
public static LinkedBlockingDeque<String> taskQueue = new LinkedBlockingDeque<String>();
public static LinkedBlockingDeque<String> sendQueue = new LinkedBlockingDeque<String>();
}

94
src/main/java/com/bfd/crawl/datafilter/util/StringUtil.java

@ -0,0 +1,94 @@
package com.bfd.crawl.datafilter.util;
import lombok.extern.slf4j.Slf4j;
import java.security.MessageDigest;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author jinming
* @version 1.0
* @className StringUtile
* @Date 2022/1/21 11:46
*/
@Slf4j
public class StringUtil {
public static boolean hasValue(String str) {
return str != null && !"".equals(str.trim());
}
public static String getRegexGroup(String regex, String str, int id) {
String resultStr = "";
if (hasValue(str)) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(str);
if (m.find()) {
resultStr = m.group(id);
}
}
if ("".equals(resultStr)) {
}
return resultStr;
}
public static Set<String> getEmailAddress(String message) {
Set<String> emailList = new HashSet<>();
Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+");
Matcher m = pattern.matcher(message);
while (m.find()) {
emailList.add(m.group(0));
}
return emailList;
}
public static String getMd5(String string) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] bs = md5.digest(string.getBytes("UTF-8"));
StringBuilder sb = new StringBuilder(40);
for (byte x : bs) {
if ((x & 0xff) >> 4 == 0) {
sb.append("0").append(Integer.toHexString(x & 0xff));
} else {
sb.append(Integer.toHexString(x & 0xff));
}
}
return sb.toString();
} catch (Exception e) {
//LOG.error("获取md5异常", e);
return "nceaform" + System.currentTimeMillis();
}
}
public static String removeAllHtmlTags(String str) {
return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : "";
}
public static String getRegexGroup(Pattern regex, String str, int id) {
String resultStr = "";
if (hasValue(str)) {
Matcher m = regex.matcher(str);
if (m.find()) {
resultStr = m.group(id);
}
}
if ("".equals(resultStr)) {
log.error(regex + " parser error!");
}
return resultStr;
}
public static String getStrByPattern(String str, String regex) {
Pattern pattern = Pattern.compile(regex);
Matcher m = pattern.matcher(str);
return m.find() ? m.group(0) : "";
}
}

64
src/main/resources/application.yml

@ -0,0 +1,64 @@
server:
port: 7088
spring:
application:
name: 过滤器
boot:
admin:
client:
health:
timeout: 10s
url: http://172.16.12.55:8001
instance:
service-base-url: http://172.16.12.56:7088
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: crawl
password: crawl123
url: jdbc:mysql://172.26.11.110:3306/kyyzgpt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false
redis:
host: 172.24.12.126
port: 6379
timeout: 10000
database: 7
jedis:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
max-wait: 800 # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 8 # 连接池中的最大空闲连接
min-idle: 2 # 连接池中的最小空闲连接
kafka:
bootstrap-servers: 172.16.12.55:9092,172.16.12.56:9092,172.16.12.57:9092
producer:
retries: 3
acks: all
batch-size: 4096
buffer-memory: 102476800
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
jpa:
database-platform: org.hibernate.dialect.MySQL8Dialect
hibernate:
naming:
physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
logging:
file:
path: ./logs
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
send:
topic: analyze
thread:
handler: 50
send: 20

36
src/main/resources/logback-spring.xml

@ -0,0 +1,36 @@
<configuration>
<!-- 属性文件:在properties文件中找到对应的配置项 -->
<springProperty scope="context" name="logging.file.path" source="logging.file.path"/>
<springProperty scope="context" name="logging.level" source="logging.level"/>
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 -->
<appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<appender name="GLMAPPER-LOGGERONE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${logging.level}</level>
</filter>
<file>
${logging.file.path}/data-filter.log
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logging.file.path}/data-filter.log.%d{yyyy-MM-dd}</FileNamePattern>
<MaxHistory>3</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="GLMAPPER-LOGGERONE"/>
<appender-ref ref="STDOUT"/>
</root>
</configuration>

13
src/test/java/com/bfd/crawl/datafilter/DataFilterApplicationTests.java

@ -0,0 +1,13 @@
package com.bfd.crawl.datafilter;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DataFilterApplicationTests {
@Test
void contextLoads() {
}
}
Loading…
Cancel
Save