commit
f4b9fae646
23 changed files with 1562 additions and 0 deletions
-
33.gitignore
-
1README.md
-
124pom.xml
-
30src/main/java/com/bfd/crawl/handlerdata/HandlerDataApplication.java
-
54src/main/java/com/bfd/crawl/handlerdata/bean/Constants.java
-
59src/main/java/com/bfd/crawl/handlerdata/bean/ResponsePo.java
-
48src/main/java/com/bfd/crawl/handlerdata/config/AsyncThreadConfiguration.java
-
25src/main/java/com/bfd/crawl/handlerdata/config/SchedulerConfig.java
-
25src/main/java/com/bfd/crawl/handlerdata/config/ZookeeperConfig.java
-
58src/main/java/com/bfd/crawl/handlerdata/controller/HandlerDataController.java
-
32src/main/java/com/bfd/crawl/handlerdata/enums/ResponseCode.java
-
31src/main/java/com/bfd/crawl/handlerdata/exception/GlobalExceptionHandler.java
-
424src/main/java/com/bfd/crawl/handlerdata/service/ElasticSearchService.java
-
115src/main/java/com/bfd/crawl/handlerdata/service/HandlerService.java
-
46src/main/java/com/bfd/crawl/handlerdata/service/SendService.java
-
67src/main/java/com/bfd/crawl/handlerdata/service/StartServcie.java
-
42src/main/java/com/bfd/crawl/handlerdata/service/TaskService.java
-
64src/main/java/com/bfd/crawl/handlerdata/service/ZookeeperNodeMonitor.java
-
73src/main/java/com/bfd/crawl/handlerdata/util/CronExpressionGeneratorUtil.java
-
19src/main/java/com/bfd/crawl/handlerdata/util/QueueUtil.java
-
94src/main/java/com/bfd/crawl/handlerdata/util/StringUtil.java
-
59src/main/resources/application.yml
-
39src/main/resources/logback-spring.xml
@ -0,0 +1,33 @@ |
|||
HELP.md |
|||
target/ |
|||
!.mvn/wrapper/maven-wrapper.jar |
|||
!**/src/main/**/target/ |
|||
!**/src/test/**/target/ |
|||
|
|||
### STS ### |
|||
.apt_generated |
|||
.classpath |
|||
.factorypath |
|||
.project |
|||
.settings |
|||
.springBeans |
|||
.sts4-cache |
|||
|
|||
### IntelliJ IDEA ### |
|||
.idea |
|||
*.iws |
|||
*.iml |
|||
*.ipr |
|||
|
|||
### NetBeans ### |
|||
/nbproject/private/ |
|||
/nbbuild/ |
|||
/dist/ |
|||
/nbdist/ |
|||
/.nb-gradle/ |
|||
build/ |
|||
!**/src/main/**/build/ |
|||
!**/src/test/**/build/ |
|||
|
|||
### VS Code ### |
|||
.vscode/ |
|||
@ -0,0 +1 @@ |
|||
采集平台应用 |
|||
@ -0,0 +1,124 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
<groupId>com.bfd.crawl</groupId> |
|||
<artifactId>handlerData</artifactId> |
|||
<version>0.0.1-SNAPSHOT</version> |
|||
<name>handlerData</name> |
|||
<description>handlerData</description> |
|||
<properties> |
|||
<java.version>1.8</java.version> |
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
|||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> |
|||
<spring-boot.version>2.2.4.RELEASE</spring-boot.version> |
|||
</properties> |
|||
<dependencies> |
|||
<dependency> |
|||
<groupId>de.codecentric</groupId> |
|||
<artifactId>spring-boot-admin-client</artifactId> |
|||
<version>2.2.4</version> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>com.google.code.gson</groupId> |
|||
<artifactId>gson</artifactId> |
|||
<version>2.8.8</version> |
|||
</dependency> |
|||
<!-- <dependency>--> |
|||
<!-- <groupId>org.springframework.boot</groupId>--> |
|||
<!-- <artifactId>spring-boot-starter</artifactId>--> |
|||
<!-- </dependency>--> |
|||
<dependency> |
|||
<groupId>org.springframework.kafka</groupId> |
|||
<artifactId>spring-kafka</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.springframework.boot</groupId> |
|||
<artifactId>spring-boot-starter-web</artifactId> |
|||
</dependency> |
|||
<!--JSON--> |
|||
<dependency> |
|||
<groupId>com.alibaba</groupId> |
|||
<artifactId>fastjson</artifactId> |
|||
<version>1.2.0</version> |
|||
</dependency> |
|||
<!--OKHTTP--> |
|||
<dependency> |
|||
<groupId>com.squareup.okhttp3</groupId> |
|||
<artifactId>okhttp</artifactId> |
|||
<version>3.9.1</version> |
|||
</dependency> |
|||
<!-- <dependency>--> |
|||
<!-- <groupId>org.springframework.boot</groupId>--> |
|||
<!-- <artifactId>spring-boot-devtools</artifactId>--> |
|||
<!-- <scope>runtime</scope>--> |
|||
<!-- <optional>true</optional>--> |
|||
<!-- </dependency>--> |
|||
<dependency> |
|||
<groupId>org.projectlombok</groupId> |
|||
<artifactId>lombok</artifactId> |
|||
<optional>true</optional> |
|||
</dependency> |
|||
<!-- <dependency>--> |
|||
<!-- <groupId>org.springframework.boot</groupId>--> |
|||
<!-- <artifactId>spring-boot-starter-test</artifactId>--> |
|||
<!-- <scope>test</scope>--> |
|||
<!-- </dependency>--> |
|||
<dependency> |
|||
<groupId>org.springframework.kafka</groupId> |
|||
<artifactId>spring-kafka-test</artifactId> |
|||
<scope>test</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.kafka</groupId> |
|||
<artifactId>kafka-clients</artifactId> |
|||
<version>2.3.1</version> <!--根据您正在使用的Kafka版本选择合适的版本号--> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>com.bfd.util</groupId> |
|||
<artifactId>pauseTool</artifactId> |
|||
<version>1.0</version> |
|||
</dependency> |
|||
</dependencies> |
|||
<dependencyManagement> |
|||
<dependencies> |
|||
<dependency> |
|||
<groupId>org.springframework.boot</groupId> |
|||
<artifactId>spring-boot-dependencies</artifactId> |
|||
<version>${spring-boot.version}</version> |
|||
<type>pom</type> |
|||
<scope>import</scope> |
|||
</dependency> |
|||
</dependencies> |
|||
</dependencyManagement> |
|||
|
|||
<build> |
|||
<finalName>handlerData-0.0.1-SNAPSHOT</finalName> |
|||
<plugins> |
|||
<plugin> |
|||
<groupId>org.springframework.boot</groupId> |
|||
<artifactId>spring-boot-maven-plugin</artifactId> |
|||
<version>2.4.1</version> |
|||
<configuration> |
|||
<includeSystemScope>true</includeSystemScope> |
|||
</configuration> |
|||
<executions> |
|||
<execution> |
|||
<goals> |
|||
<goal>repackage</goal> |
|||
</goals> |
|||
</execution> |
|||
</executions> |
|||
</plugin> |
|||
<plugin> |
|||
<groupId>org.apache.maven.plugins</groupId> |
|||
<artifactId>maven-compiler-plugin</artifactId> |
|||
<configuration> |
|||
<source>8</source> |
|||
<target>8</target> |
|||
</configuration> |
|||
</plugin> |
|||
</plugins> |
|||
</build> |
|||
|
|||
</project> |
|||
@ -0,0 +1,30 @@ |
|||
package com.bfd.crawl.handlerdata; |
|||
|
|||
import de.codecentric.boot.admin.client.registration.Application; |
|||
import de.codecentric.boot.admin.client.registration.ApplicationRegistrator; |
|||
import org.springframework.boot.SpringApplication; |
|||
import org.springframework.boot.SpringBootConfiguration; |
|||
import org.springframework.boot.autoconfigure.SpringBootApplication; |
|||
import org.springframework.boot.context.event.ApplicationReadyEvent; |
|||
import org.springframework.context.event.EventListener; |
|||
import org.springframework.scheduling.annotation.EnableAsync; |
|||
import org.springframework.scheduling.annotation.EnableScheduling; |
|||
import org.springframework.stereotype.Component; |
|||
/** |
|||
* @author:jinming |
|||
* @className:HandlerDataApplication |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2023/7/31 17:14 |
|||
*/ |
|||
@SpringBootApplication |
|||
@EnableAsync |
|||
@EnableScheduling |
|||
public class HandlerDataApplication { |
|||
|
|||
public static void main(String[] args) { |
|||
SpringApplication.run(HandlerDataApplication.class, args); |
|||
|
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,54 @@ |
|||
package com.bfd.crawl.handlerdata.bean; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:Constants |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2023/7/14 10:41 |
|||
*/ |
|||
public class Constants { |
|||
/** |
|||
* 查询数据时间 |
|||
*/ |
|||
public static String TIME = "pubTime"; |
|||
|
|||
/** |
|||
* 查询数据开始时间 |
|||
*/ |
|||
public static String START_TIME = "gte"; |
|||
|
|||
/** |
|||
* 查询数据结束时间 |
|||
*/ |
|||
public static String END_TIME = "lte"; |
|||
|
|||
/** |
|||
* 查询排除词 |
|||
*/ |
|||
public static String OR = "or"; |
|||
|
|||
/** |
|||
* 查询数据开始时间 |
|||
*/ |
|||
public static String TITLE = "title"; |
|||
/** |
|||
* 查询数据开始时间 |
|||
*/ |
|||
public static String AUTHOR = "author"; |
|||
/** |
|||
* 查询数据开始时间 |
|||
*/ |
|||
public static String CONTENT = "content"; |
|||
/** |
|||
* 查询数据开始时间 |
|||
*/ |
|||
public static String TYPE = "type"; |
|||
|
|||
/** |
|||
* 查询数据开始时间 |
|||
*/ |
|||
public static final String INCLUDE = "include"; |
|||
public static final String EXCLUSION = "exclusion"; |
|||
public static final String ATTACHMENTFILTER = "attachmentFilter"; |
|||
} |
|||
@ -0,0 +1,59 @@ |
|||
package com.bfd.crawl.handlerdata.bean; |
|||
|
|||
|
|||
import com.bfd.crawl.handlerdata.enums.ResponseCode; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:ResponsePo |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2023/4/3 17:23 |
|||
*/ |
|||
@Data |
|||
@NoArgsConstructor |
|||
@AllArgsConstructor |
|||
public class ResponsePo { |
|||
/** |
|||
* 响应码 |
|||
*/ |
|||
private int code; |
|||
|
|||
/** |
|||
* 正常放 返回数据 的JSON串 |
|||
*/ |
|||
private Object data; |
|||
|
|||
/** |
|||
* 提示消息 |
|||
*/ |
|||
private String message; |
|||
|
|||
public static ResponsePo success() { |
|||
return setStatus(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getMessage()); |
|||
} |
|||
|
|||
public static ResponsePo error() { |
|||
return setStatus(ResponseCode.FAILURE.getCode(), ResponseCode.FAILURE.getMessage()); |
|||
} |
|||
|
|||
public static ResponsePo setStatus(int code, String message) { |
|||
ResponsePo resultBean = new ResponsePo(); |
|||
resultBean.code = code; |
|||
resultBean.message = message; |
|||
return resultBean; |
|||
} |
|||
public ResponsePo(int code, String message) { |
|||
this.code = code; |
|||
this.message = message; |
|||
this.data = data; |
|||
} |
|||
public ResponsePo(ResponseCode responseCode){ |
|||
this.code = responseCode.getCode(); |
|||
this.message = responseCode.getMessage(); |
|||
this.data = data; |
|||
} |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
package com.bfd.crawl.handlerdata.config; |
|||
|
|||
|
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.scheduling.annotation.EnableAsync; |
|||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
|||
|
|||
import java.util.concurrent.Executor; |
|||
|
|||
/** |
|||
* @author jinming |
|||
* @version 1.0 |
|||
* @className AsyncThreadConfiguration |
|||
* @Date 2022/2/17 18:37 |
|||
*/ |
|||
@Configuration |
|||
@EnableAsync |
|||
public class AsyncThreadConfiguration { |
|||
@Bean(name = "asyncExecutor") |
|||
public Executor asyncExecutor() { |
|||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); |
|||
// 核心线程数 |
|||
executor.setCorePoolSize(500); |
|||
// 并发线程的数量限制为2 |
|||
executor.setMaxPoolSize(500); |
|||
// 线程队列 |
|||
executor.setQueueCapacity(500); |
|||
executor.setThreadNamePrefix("handlerData-"); |
|||
executor.initialize(); |
|||
executor.setWaitForTasksToCompleteOnShutdown(true); |
|||
return executor; |
|||
} |
|||
@Bean(name = "sendExecutor") |
|||
public Executor sendExecutor() { |
|||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); |
|||
// 核心线程数 |
|||
executor.setCorePoolSize(500); |
|||
// 并发线程的数量限制为2 |
|||
executor.setMaxPoolSize(500); |
|||
// 线程队列 |
|||
executor.setQueueCapacity(500); |
|||
executor.setThreadNamePrefix("sendData-"); |
|||
executor.initialize(); |
|||
executor.setWaitForTasksToCompleteOnShutdown(true); |
|||
return executor; |
|||
} |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
package com.bfd.crawl.handlerdata.config; |
|||
|
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:SchedulerConfig |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2024/7/10 17:21 |
|||
*/ |
|||
@Configuration |
|||
public class SchedulerConfig { |
|||
|
|||
@Bean |
|||
public ThreadPoolTaskScheduler taskScheduler() { |
|||
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); |
|||
scheduler.setPoolSize(300); |
|||
scheduler.setThreadNamePrefix("task-"); |
|||
scheduler.initialize(); |
|||
return scheduler; |
|||
} |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
package com.bfd.crawl.handlerdata.config; |
|||
|
|||
import org.apache.curator.framework.CuratorFramework; |
|||
import org.apache.curator.framework.CuratorFrameworkFactory; |
|||
import org.apache.curator.retry.ExponentialBackoffRetry; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
/** |
|||
* @author jian.mao |
|||
* @date 2024年4月16日 |
|||
* @description |
|||
*/ |
|||
@Configuration |
|||
public class ZookeeperConfig { |
|||
@Value("${zookeeper.connection-string}") |
|||
private String connectionString; |
|||
@Bean |
|||
public CuratorFramework curatorFramework() { |
|||
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3)); |
|||
curatorFramework.start(); |
|||
return curatorFramework; |
|||
} |
|||
} |
|||
@ -0,0 +1,58 @@ |
|||
package com.bfd.crawl.handlerdata.controller; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.bfd.crawl.handlerdata.bean.ResponsePo; |
|||
import com.bfd.crawl.handlerdata.enums.ResponseCode; |
|||
import com.bfd.crawl.handlerdata.util.QueueUtil; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.web.bind.annotation.PostMapping; |
|||
import org.springframework.web.bind.annotation.RequestBody; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:HandlerDataController |
|||
* @version:1.0 |
|||
* @description: 处理接口 |
|||
* @Date:2023/7/13 14:25 |
|||
*/ |
|||
|
|||
@RestController |
|||
@RequestMapping("/handlerdata") |
|||
@Slf4j |
|||
public class HandlerDataController { |
|||
|
|||
@PostMapping("/handler") |
|||
public ResponsePo documentFeedback(@RequestBody String dataJson) { |
|||
String trace = "trace"; |
|||
ResponsePo responsePo = ResponsePo.success(); |
|||
try { |
|||
Map parse = (Map) JSON.parse(dataJson); |
|||
log.info("新增任务:" + dataJson); |
|||
if (parse.containsKey(trace) && (Boolean) parse.get(trace) == true) { |
|||
log.info("测试流程,插入队首"); |
|||
QueueUtil.taskQueue.putFirst(dataJson); |
|||
} else { |
|||
QueueUtil.taskQueue.put(dataJson); |
|||
} |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
log.error("任务添加队列异常" + e.getMessage()); |
|||
responsePo.setCode(ResponseCode.INTERNAL_SERVER_ERROR.getCode()); |
|||
responsePo.setMessage(ResponseCode.INTERNAL_SERVER_ERROR.getMessage()); |
|||
return responsePo; |
|||
} catch (Exception e) { |
|||
log.error("请求格式发生异常" + e.getMessage()); |
|||
responsePo.setCode(ResponseCode.FAILURE.getCode()); |
|||
responsePo.setMessage(ResponseCode.FAILURE.getMessage()); |
|||
return responsePo; |
|||
} |
|||
return responsePo; |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
package com.bfd.crawl.handlerdata.enums; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:ResponseCodeEnum |
|||
* @version:1.0 |
|||
* @description:响应结果码枚举类 |
|||
* @Date:2023/2/28 11:40 |
|||
*/ |
|||
public enum ResponseCode { |
|||
//返回结果码枚举类 |
|||
SUCCESS(200, "操作成功"), |
|||
FAILURE(400, "参数错误"), |
|||
INTERNAL_SERVER_ERROR(500, "服务器内部错误"), |
|||
TYPE_NOT_SUPPORT(601,"文件类型不支持"); |
|||
|
|||
private int code; |
|||
private String message; |
|||
|
|||
ResponseCode(int code, String message) { |
|||
this.code = code; |
|||
this.message = message; |
|||
} |
|||
|
|||
public int getCode() { |
|||
return code; |
|||
} |
|||
|
|||
public String getMessage() { |
|||
return message; |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
package com.bfd.crawl.handlerdata.exception; |
|||
|
|||
|
|||
import com.bfd.crawl.handlerdata.bean.ResponsePo; |
|||
import com.bfd.crawl.handlerdata.enums.ResponseCode; |
|||
import org.springframework.http.HttpStatus; |
|||
import org.springframework.web.bind.annotation.ExceptionHandler; |
|||
import org.springframework.web.bind.annotation.ResponseStatus; |
|||
import org.springframework.web.bind.annotation.RestControllerAdvice; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:GlobalExceptionHandler |
|||
* @version:1.0 |
|||
* @description: 异常处理类 |
|||
* @Date:2023/2/28 16:29 |
|||
*/ |
|||
@RestControllerAdvice |
|||
public class GlobalExceptionHandler { |
|||
|
|||
@ExceptionHandler(value = {IllegalArgumentException.class}) |
|||
@ResponseStatus(HttpStatus.BAD_REQUEST) |
|||
public ResponsePo handleBadRequest(Exception ex) { |
|||
return new ResponsePo(ResponseCode.FAILURE.getCode(), ex.getMessage()); |
|||
} |
|||
@ExceptionHandler(value = {Exception.class}) |
|||
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) |
|||
public ResponsePo handleException(Exception ex) { |
|||
return new ResponsePo(ResponseCode.INTERNAL_SERVER_ERROR.getCode(), ex.getMessage()); |
|||
} |
|||
} |
|||
@ -0,0 +1,424 @@ |
|||
package com.bfd.crawl.handlerdata.service; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.bfd.crawl.handlerdata.bean.Constants; |
|||
import com.bfd.crawl.handlerdata.util.QueueUtil; |
|||
import com.bfd.crawl.handlerdata.util.StringUtil; |
|||
import com.google.gson.Gson; |
|||
import com.google.gson.GsonBuilder; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import okhttp3.*; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
import javax.annotation.Resource; |
|||
import java.io.IOException; |
|||
import java.util.*; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:ElasticSearchService |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2024/7/10 18:04 |
|||
*/ |
|||
@Service |
|||
@Slf4j |
|||
public class ElasticSearchService { |
|||
@Value("${thread.sleep}") |
|||
private int sleepTimes; |
|||
|
|||
@Resource |
|||
private StringRedisTemplate stringRedisTemplate; |
|||
|
|||
public void performQuery(Map parse) throws Exception { |
|||
String errorMessage = ""; |
|||
Gson gson = new GsonBuilder().serializeNulls().create(); |
|||
try { |
|||
|
|||
OkHttpClient client = new OkHttpClient().newBuilder() |
|||
.build(); |
|||
int id = (int) parse.get("id"); |
|||
int scenesId = (int) parse.get("scenes_id"); |
|||
Map admin = (Map) parse.get("input"); |
|||
List<String> taskIds = (List<String>) admin.get("taskId"); |
|||
String indexs = admin.get("Index").toString(); |
|||
String host = (String) admin.get("host"); |
|||
String port = (String) admin.get("port"); |
|||
String requestUrl = "http://"; |
|||
requestUrl = requestUrl.concat("172.26.250.44").concat(":").concat("9200").concat("/"); |
|||
String scoreUrl = requestUrl.concat("_search/scroll"); |
|||
requestUrl = requestUrl.concat("cl_special_1.0_").concat(indexs).concat("/_search?scroll=5m"); |
|||
int index = 1; |
|||
int dataIndex = 1; |
|||
int total = 0; |
|||
String scrollId = ""; |
|||
log.info("任务ID:" + id + "的查询连接为:" + requestUrl); |
|||
//总数据量统计 |
|||
int allDataTotal = 0; |
|||
for (String taskId : taskIds) { |
|||
String queryQl = makeQueryQl(admin, index, String.valueOf(scenesId), taskId); |
|||
log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的查询条件为:" + queryQl); |
|||
allDataTotal += getDataTotal(requestUrl, queryQl); |
|||
} |
|||
log.info("任务ID:" + id + "的总数据量为:" + allDataTotal + "共需要查询" + Math.ceil(allDataTotal / 500f) + "次"); |
|||
for (String taskId : taskIds) { |
|||
index = 1; |
|||
total = 0; |
|||
try { |
|||
String queryQl = null; |
|||
try { |
|||
queryQl = makeQueryQl(admin, index, String.valueOf(scenesId), taskId); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
errorMessage = "查询语句组装失败"; |
|||
} |
|||
if (total == 0) { |
|||
total = getDataTotal(requestUrl, queryQl); |
|||
} |
|||
log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的查询条件为:" + queryQl); |
|||
log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的总数据量为:" + total + "共需要查询" + Math.ceil(total / 500f) + "次"); |
|||
MediaType mediaType = MediaType.parse("application/json"); |
|||
RequestBody body = RequestBody.create(mediaType, queryQl); |
|||
Request firstRequest = new Request.Builder() |
|||
.url(requestUrl) |
|||
.method("POST", body) |
|||
.addHeader("Authorization", "Basic ZWxhc3RpYzpiYWlmZW5kaWFuMTIz") |
|||
.addHeader("Content-Type", "application/json") |
|||
.build(); |
|||
String firstResultString = null; |
|||
try { |
|||
Response firstResponse = client.newCall(firstRequest).execute(); |
|||
firstResultString = firstResponse.body().string(); |
|||
} catch (IOException e) { |
|||
e.printStackTrace(); |
|||
errorMessage = "ES 链接失败"; |
|||
} |
|||
Map resultParse = (Map) JSON.parse(firstResultString); |
|||
scrollId = (String) resultParse.get("_scroll_id"); |
|||
Map hitsMap = (Map) resultParse.get("hits"); |
|||
List<Map> hits = (List<Map>) hitsMap.get("hits"); |
|||
for (Map hitMap : hits) { |
|||
Map source = (Map) hitMap.get("_source"); |
|||
source.put("subjectId", indexs); |
|||
Map result = new HashMap(32); |
|||
Map resultData = new HashMap(32); |
|||
//获取输出字段 |
|||
Map<String, Object> output = (Map<String, Object>) parse.get("output"); |
|||
for (String key : output.keySet()) { |
|||
if (source.containsKey(key)) { |
|||
resultData.put(key, source.get(key)); |
|||
} |
|||
} |
|||
if (dataIndex == allDataTotal) { |
|||
resultData.put("isLast", 1); |
|||
Thread.sleep(sleepTimes * 1000); |
|||
} |
|||
result.put("results", JSONObject.toJSONString(resultData)); |
|||
result.put("status", 1); |
|||
parse.put("result", result); |
|||
String message = gson.toJson(parse); |
|||
if (!StringUtil.hasValue(errorMessage)) { |
|||
QueueUtil.sendQueue.put(message); |
|||
} |
|||
dataIndex++; |
|||
} |
|||
log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的第" + index + "次查询"); |
|||
index++; |
|||
do { |
|||
Map newQlMap = new HashMap(32); |
|||
newQlMap.put("scroll", "5m"); |
|||
newQlMap.put("scroll_id", scrollId); |
|||
MediaType mediaTypeScroll = MediaType.parse("application/json"); |
|||
RequestBody bodyScroll = RequestBody.create(mediaTypeScroll, JSON.toJSONString(newQlMap)); |
|||
Request scrollRequest = new Request.Builder() |
|||
.url(scoreUrl) |
|||
.method("POST", bodyScroll).addHeader("Authorization", "Basic ZWxhc3RpYzpiYWlmZW5kaWFuMTIz") |
|||
.addHeader("Content-Type", "application/json") |
|||
.build(); |
|||
String scrollResultString = null; |
|||
try { |
|||
Response scrollResponse = client.newCall(scrollRequest).execute(); |
|||
scrollResultString = scrollResponse.body().string(); |
|||
} catch (IOException e) { |
|||
e.printStackTrace(); |
|||
errorMessage = "ES 链接失败"; |
|||
} |
|||
Map scrollResultParse = (Map) JSON.parse(scrollResultString); |
|||
|
|||
Map scrollHitsMap = (Map) scrollResultParse.get("hits"); |
|||
hits = (List<Map>) scrollHitsMap.get("hits"); |
|||
for (Map hitMap : hits) { |
|||
Map source = (Map) hitMap.get("_source"); |
|||
source.put("subjectId", indexs); |
|||
Map result = new HashMap(32); |
|||
Map resultData = new HashMap(32); |
|||
//获取输出字段 |
|||
Map<String, Object> output = (Map<String, Object>) parse.get("output"); |
|||
for (String key : output.keySet()) { |
|||
if (source.containsKey(key)) { |
|||
resultData.put(key, source.get(key)); |
|||
} |
|||
} |
|||
if (dataIndex == allDataTotal) { |
|||
resultData.put("isLast", 1); |
|||
Thread.sleep(sleepTimes * 1000); |
|||
} |
|||
result.put("status", 1); |
|||
result.put("message", errorMessage); |
|||
result.put("results", JSONObject.toJSONString(resultData)); |
|||
parse.put("result", result); |
|||
String message = gson.toJson(parse); |
|||
if (!StringUtil.hasValue(errorMessage)) { |
|||
QueueUtil.sendQueue.put(message); |
|||
} |
|||
dataIndex++; |
|||
} |
|||
log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的第" + index + "次查询"); |
|||
index++; |
|||
} while (hits.size() != 0 && hits.size() == 500); |
|||
log.info("任务ID:" + id + "查询结束"); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
stringRedisTemplate.opsForHash().put("selection", String.valueOf(scenesId), String.valueOf(System.currentTimeMillis())); |
|||
} catch (Throwable e) { |
|||
log.error("处理程序发生异常:" + e.getMessage()); |
|||
e.printStackTrace(); |
|||
errorMessage = "未知异常"; |
|||
Map result = new HashMap(32); |
|||
result.put("status", 2); |
|||
result.put("results", ""); |
|||
result.put("message", errorMessage); |
|||
parse.put("result", result); |
|||
String message = gson.toJson(parse); |
|||
try { |
|||
QueueUtil.sendQueue.put(message); |
|||
} catch (InterruptedException ex) { |
|||
ex.printStackTrace(); |
|||
} |
|||
} |
|||
if (StringUtil.hasValue(errorMessage)) { |
|||
Map result = new HashMap(32); |
|||
result.put("status", 2); |
|||
result.put("results", ""); |
|||
result.put("message", errorMessage); |
|||
parse.put("result", result); |
|||
String message = gson.toJson(parse); |
|||
try { |
|||
QueueUtil.sendQueue.put(message); |
|||
} catch (InterruptedException ex) { |
|||
ex.printStackTrace(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private int getDataTotal(String requestUrl, String queryQl) { |
|||
int total = 0; |
|||
try { |
|||
OkHttpClient client = new OkHttpClient().newBuilder() |
|||
.build(); |
|||
MediaType mediaType = MediaType.parse("application/json"); |
|||
RequestBody body = RequestBody.create(mediaType, queryQl); |
|||
Request firstRequest = new Request.Builder() |
|||
.url(requestUrl) |
|||
.method("POST", body) |
|||
.addHeader("Authorization", "Basic ZWxhc3RpYzpiYWlmZW5kaWFuMTIz") |
|||
.addHeader("Content-Type", "application/json") |
|||
.build(); |
|||
Response firstResponse = client.newCall(firstRequest).execute(); |
|||
String firstResultString = firstResponse.body().string(); |
|||
Map resultParse = (Map) JSON.parse(firstResultString); |
|||
Map hitsMap = (Map) resultParse.get("hits"); |
|||
Map totalMap = (Map) hitsMap.get("total"); |
|||
total = (int) totalMap.get("value"); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
return total; |
|||
} |
|||
|
|||
public String makeQueryQl(Map admin, int pageIndex, String scenesId, String... taskId) { |
|||
int dataRange = 1; |
|||
try { |
|||
dataRange = (int) admin.get("dataRange"); |
|||
} catch (Exception e) { |
|||
} |
|||
int size = 500; |
|||
int from = (pageIndex - 1) * size; |
|||
Map queryQl = new HashMap(32); |
|||
queryQl.put("from", from); |
|||
queryQl.put("size", size); |
|||
Map queryMap = new HashMap(32); |
|||
Map boolMap = new HashMap(32); |
|||
List<Map> mustList = new ArrayList<>(); |
|||
List<Map> shouldList = new ArrayList<>(); |
|||
List<Map> mustNotList = new ArrayList<>(); |
|||
if (taskId.length > 0) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
match.put("taskId", taskId[0]); |
|||
matchMap.put("match", match); |
|||
mustList.add(matchMap); |
|||
} |
|||
if (admin.containsKey(Constants.TIME)) { |
|||
Map timeCondition = new HashMap(32); |
|||
Map time = (Map) admin.get(Constants.TIME); |
|||
Long startTime = (Long) time.get(Constants.START_TIME); |
|||
Long endTime = (Long) time.get(Constants.END_TIME); |
|||
Map rangeMap = new HashMap(32); |
|||
Map pubTimeMap = new HashMap(32); |
|||
pubTimeMap.put("gte", startTime); |
|||
pubTimeMap.put("lte", endTime); |
|||
rangeMap.put("pubTime", pubTimeMap); |
|||
timeCondition.put("range", rangeMap); |
|||
mustList.add(timeCondition); |
|||
} |
|||
if (dataRange == 0) { |
|||
long nowtime = System.currentTimeMillis(); |
|||
long startTime = 0L; |
|||
|
|||
//取上次查询的抓取时间来进行时间的过滤,只针对增量逻辑 |
|||
boolean hasTime = stringRedisTemplate.opsForHash().hasKey("selection", scenesId); |
|||
if (hasTime) { |
|||
long lastQueryTime = Long.valueOf(stringRedisTemplate.opsForHash().get("selection", scenesId).toString()); |
|||
boolean isNew = nowtime - lastQueryTime > 10 * 60 * 1000; |
|||
if (isNew) { |
|||
startTime = lastQueryTime; |
|||
} |
|||
} |
|||
Map timeCondition = new HashMap(32); |
|||
Map rangeMap = new HashMap(32); |
|||
Map crawlTimeMap = new HashMap(32); |
|||
crawlTimeMap.put("gte", startTime); |
|||
rangeMap.put("crawlTime", crawlTimeMap); |
|||
timeCondition.put("range", rangeMap); |
|||
mustList.add(timeCondition); |
|||
//操作后放入当前时间,作为下次查询的条件 |
|||
} |
|||
|
|||
if (admin.containsKey(Constants.OR)) { |
|||
List<List<Map<String, String>>> ors = (List<List<Map<String, String>>>) admin.get(Constants.OR); |
|||
int loopIndex = 0; |
|||
|
|||
for (List<Map<String, String>> or : ors) { |
|||
|
|||
for (Map<String, String> stringStringMap : or) { |
|||
String type = stringStringMap.get(Constants.TYPE); |
|||
String title = stringStringMap.get(Constants.TITLE); |
|||
String author = stringStringMap.get(Constants.AUTHOR); |
|||
String content = stringStringMap.get(Constants.CONTENT); |
|||
switch (type) { |
|||
case Constants.INCLUDE: |
|||
if (StringUtil.hasValue(title)) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
Map contentMap = new HashMap(32); |
|||
contentMap.put("query", title); |
|||
match.put("title", contentMap); |
|||
matchMap.put("match_phrase", match); |
|||
if (loopIndex > 0) { |
|||
shouldList.add(matchMap); |
|||
} else { |
|||
mustList.add(matchMap); |
|||
} |
|||
|
|||
} |
|||
if (StringUtil.hasValue(content)) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
Map contentMap = new HashMap(32); |
|||
contentMap.put("query", content); |
|||
contentMap.put("boost", 5); |
|||
match.put("content", contentMap); |
|||
matchMap.put("match_phrase", match); |
|||
if (loopIndex > 0) { |
|||
shouldList.add(matchMap); |
|||
} else { |
|||
mustList.add(matchMap); |
|||
} |
|||
} |
|||
if (StringUtil.hasValue(author)) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
Map contentMap = new HashMap(32); |
|||
contentMap.put("query", author); |
|||
contentMap.put("boost", 5); |
|||
match.put("author", contentMap); |
|||
matchMap.put("match_phrase", match); |
|||
if (loopIndex > 0) { |
|||
shouldList.add(matchMap); |
|||
} else { |
|||
mustList.add(matchMap); |
|||
} |
|||
} |
|||
break; |
|||
case Constants.EXCLUSION: |
|||
if (StringUtil.hasValue(title)) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
match.put("title", title); |
|||
matchMap.put("match", match); |
|||
mustNotList.add(matchMap); |
|||
} |
|||
if (StringUtil.hasValue(content)) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
match.put("content", content); |
|||
matchMap.put("match", match); |
|||
mustNotList.add(matchMap); |
|||
} |
|||
if (StringUtil.hasValue(author)) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
match.put("author", author); |
|||
matchMap.put("match", match); |
|||
mustNotList.add(matchMap); |
|||
} |
|||
break; |
|||
default: |
|||
break; |
|||
} |
|||
} |
|||
loopIndex++; |
|||
} |
|||
} |
|||
if (admin.containsKey(Constants.ATTACHMENTFILTER)) { |
|||
Map<String, Boolean> attachmentFilter = (Map<String, Boolean>) admin.get(Constants.ATTACHMENTFILTER); |
|||
Set<String> attachmentFilterKeys = attachmentFilter.keySet(); |
|||
for (String attachmentFilterKey : attachmentFilterKeys) { |
|||
boolean aBoolean = attachmentFilter.get(attachmentFilterKey); |
|||
if (aBoolean) { |
|||
Map matchMap = new HashMap(32); |
|||
Map match = new HashMap(32); |
|||
match.put(attachmentFilterKey, 1); |
|||
matchMap.put("term", match); |
|||
mustList.add(matchMap); |
|||
} |
|||
} |
|||
} |
|||
|
|||
if (shouldList.size() > 0) { |
|||
boolMap.put("should", shouldList); |
|||
} |
|||
if (mustList.size() > 0) { |
|||
boolMap.put("must", mustList); |
|||
} |
|||
if (mustNotList.size() > 0) { |
|||
boolMap.put("must_not", mustNotList); |
|||
} |
|||
if (boolMap.isEmpty()) { |
|||
Map matchAll = new HashMap(32); |
|||
queryMap.put("match_all", matchAll); |
|||
} else { |
|||
queryMap.put("bool", boolMap); |
|||
} |
|||
queryQl.put("query", queryMap); |
|||
return JSON.toJSONString(queryQl); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,115 @@ |
|||
package com.bfd.crawl.handlerdata.service; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.bfd.crawl.handlerdata.bean.Constants; |
|||
import com.bfd.crawl.handlerdata.util.CronExpressionGeneratorUtil; |
|||
import com.bfd.crawl.handlerdata.util.QueueUtil; |
|||
import com.bfd.crawl.handlerdata.util.StringUtil; |
|||
import com.bfd.util.PauseTool; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import okhttp3.*; |
|||
import org.apache.catalina.valves.CrawlerSessionManagerValve; |
|||
import org.checkerframework.checker.units.qual.A; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.scheduling.annotation.Async; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
import java.util.*; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:HandlerService |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2023/7/13 15:03 |
|||
*/ |
|||
@Service |
|||
@Slf4j |
|||
public class HandlerService { |
|||
|
|||
@Autowired |
|||
private ElasticSearchService elasticSearchService; |
|||
|
|||
@Resource |
|||
private StringRedisTemplate stringRedisTemplate; |
|||
@Autowired |
|||
private TaskService taskService; |
|||
|
|||
@Async("asyncExecutor") |
|||
void doSearch() { |
|||
|
|||
while (true) { |
|||
if (QueueUtil.taskQueue.size() > 0) { |
|||
String dataJson = null; |
|||
try { |
|||
dataJson = QueueUtil.taskQueue.take(); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
continue; |
|||
} |
|||
Map parse = (Map) JSON.parse(dataJson); |
|||
int scenesId = (int) parse.get("scenes_id"); |
|||
int version = (int) parse.get("version"); |
|||
String pauseKey = scenesId + "_" + version; |
|||
if (!PauseTool.CACHE.containsKey(pauseKey)) { |
|||
log.info("流程:{}的版本:{}已失效,任务跳过", scenesId, version); |
|||
continue; |
|||
} |
|||
int id = (int) parse.get("id"); |
|||
Map admin = (Map) parse.get("input"); |
|||
int dataUpdate = 0; |
|||
try { |
|||
dataUpdate = (int) admin.get("dataUpdate"); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
try { |
|||
elasticSearchService.performQuery(parse); |
|||
if (dataUpdate == 0) { |
|||
continue; |
|||
} |
|||
Map cycle = (Map) admin.get("cycle"); |
|||
String unit = (String) cycle.get("unit"); |
|||
String time = null; |
|||
try { |
|||
time = (String) cycle.get("time"); |
|||
} catch (Exception e) { |
|||
|
|||
} |
|||
List<Integer> date = null; |
|||
try { |
|||
date = (List<Integer>) cycle.get("date"); |
|||
} catch (Exception e) { |
|||
|
|||
} |
|||
String cronExpression = CronExpressionGeneratorUtil.generateCronExpression(unit, date, time); |
|||
taskService.scheduleTask(String.valueOf(scenesId), () -> { |
|||
try { |
|||
elasticSearchService.performQuery(parse); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
}, cronExpression); |
|||
|
|||
stringRedisTemplate.opsForHash().put("selectionTask", String.valueOf(scenesId), dataJson); |
|||
log.info("流程{}的定时任务已经启动,对应的cron表达式为:{}", scenesId, cronExpression); |
|||
} catch (Exception e) { |
|||
throw new RuntimeException(e); |
|||
} |
|||
} else { |
|||
log.info("任务队列为空,休眠3秒"); |
|||
try { |
|||
Thread.sleep(3000); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
package com.bfd.crawl.handlerdata.service; |
|||
|
|||
import com.bfd.crawl.handlerdata.util.QueueUtil; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.kafka.core.KafkaTemplate; |
|||
import org.springframework.scheduling.annotation.Async; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:SendService |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2023/7/31 17:53 |
|||
*/ |
|||
@Slf4j |
|||
@Service |
|||
public class SendService { |
|||
@Value("${send.topic}") |
|||
private String topic; |
|||
@Autowired |
|||
private KafkaTemplate kafkaTemplate; |
|||
|
|||
@Async("sendExecutor") |
|||
void sendToKafka() { |
|||
while (true) { |
|||
if (QueueUtil.sendQueue.size() > 0) { |
|||
try { |
|||
String message = QueueUtil.sendQueue.take(); |
|||
kafkaTemplate.send(topic,message); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
}else { |
|||
log.info("任务队列为空,休眠3秒"); |
|||
try { |
|||
Thread.sleep(3000); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,67 @@ |
|||
package com.bfd.crawl.handlerdata.service; |
|||
|
|||
import com.bfd.crawl.handlerdata.util.QueueUtil; |
|||
import com.bfd.util.PauseTool; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.boot.ApplicationArguments; |
|||
import org.springframework.boot.ApplicationRunner; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
import javax.annotation.Resource; |
|||
import java.util.Set; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:StartServcie |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2023/7/31 17:14 |
|||
*/ |
|||
@Service |
|||
@Slf4j |
|||
public class StartServcie implements ApplicationRunner { |
|||
@Value("${thread.handler}") |
|||
private int handlerNumber; |
|||
|
|||
@Value("${thread.send}") |
|||
private int sendNumber; |
|||
@Autowired |
|||
private HandlerService handlerService; |
|||
@Autowired |
|||
private SendService sendService; |
|||
|
|||
@Value("${zookeeper.connection-string}") |
|||
private String connectionString; |
|||
@Value("${zookeeper.publish-node}") |
|||
private String nodePath; |
|||
@Resource |
|||
private StringRedisTemplate stringRedisTemplate; |
|||
|
|||
@Override |
|||
public void run(ApplicationArguments args) throws Exception { |
|||
PauseTool pauseTool = new PauseTool(); |
|||
pauseTool.initializeRedisCache(stringRedisTemplate); |
|||
pauseTool.setupZookeeperListener(connectionString, nodePath); |
|||
try { |
|||
Set<Object> selectionTask = stringRedisTemplate.opsForHash().keys("selectionTask"); |
|||
for (Object o : selectionTask) { |
|||
String dataJson = (String) stringRedisTemplate.opsForHash().get("selectionTask", o); |
|||
QueueUtil.taskQueue.put(dataJson); |
|||
} |
|||
log.info("已从缓存加载" + selectionTask.size() + "条任务"); |
|||
} catch (Exception e) { |
|||
} |
|||
for (int i = 0; i < handlerNumber; i++) { |
|||
log.info("处理服务线程" + i + "已启动 "); |
|||
handlerService.doSearch(); |
|||
} |
|||
for (int i = 0; i < sendNumber; i++) { |
|||
log.info("发送服务线程" + i + "已启动 "); |
|||
sendService.sendToKafka(); |
|||
} |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
package com.bfd.crawl.handlerdata.service; |
|||
|
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; |
|||
import org.springframework.scheduling.support.CronTrigger; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ScheduledFuture; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:TaskService |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2024/7/10 17:21 |
|||
*/ |
|||
@Service |
|||
|
|||
public class TaskService { |
|||
|
|||
@Autowired |
|||
private ThreadPoolTaskScheduler taskScheduler; |
|||
|
|||
private ConcurrentHashMap<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>(); |
|||
|
|||
public void scheduleTask(String taskId, Runnable task, String cronExpression) { |
|||
if (scheduledTasks.containsKey(taskId)) { |
|||
scheduledTasks.get(taskId).cancel(false); |
|||
} |
|||
|
|||
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(task, new CronTrigger(cronExpression)); |
|||
scheduledTasks.put(taskId, scheduledFuture); |
|||
} |
|||
|
|||
public void cancelTask(String taskId) { |
|||
if (scheduledTasks.containsKey(taskId)) { |
|||
scheduledTasks.get(taskId).cancel(false); |
|||
scheduledTasks.remove(taskId); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,64 @@ |
|||
package com.bfd.crawl.handlerdata.service; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.curator.framework.CuratorFramework; |
|||
import org.apache.curator.framework.recipes.cache.NodeCache; |
|||
import org.apache.curator.framework.recipes.cache.NodeCacheListener; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.Resource; |
|||
|
|||
/** |
|||
* @author jian.mao |
|||
* @date 2024年4月17日 |
|||
* @description |
|||
*/ |
|||
@Component |
|||
@Slf4j |
|||
public class ZookeeperNodeMonitor { |
|||
|
|||
@Resource |
|||
private StringRedisTemplate stringRedisTemplate; |
|||
@Autowired |
|||
private CuratorFramework curatorFramework; |
|||
@Value("${zookeeper.publish-node}") |
|||
private String nodePath; |
|||
@Autowired |
|||
private TaskService taskService; |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
try { |
|||
// 创建节点监听器 |
|||
NodeCache nodeCache = new NodeCache(curatorFramework, nodePath); |
|||
nodeCache.start(); |
|||
|
|||
// 监听节点变化 |
|||
nodeCache.getListenable().addListener(new NodeCacheListener() { |
|||
@Override |
|||
public void nodeChanged() throws Exception { |
|||
byte[] data = nodeCache.getCurrentData().getData(); |
|||
String nodeData = new String(data); |
|||
log.info("Node data changed: " + nodeData); |
|||
JSONObject jsonObject = JSON.parseObject(nodeData); |
|||
int scenesId = jsonObject.getIntValue("scenes_id"); |
|||
String operation = jsonObject.getString("operation"); |
|||
String stopStatus = "stop"; |
|||
if (operation.equals(stopStatus)) { |
|||
stringRedisTemplate.opsForHash().delete("selection", String.valueOf(scenesId)); |
|||
stringRedisTemplate.opsForHash().delete("selectionTask", String.valueOf(scenesId)); |
|||
taskService.cancelTask(String.valueOf(scenesId)); |
|||
} |
|||
} |
|||
}); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
package com.bfd.crawl.handlerdata.util; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Arrays; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:CronExpressionGeneratorUtil |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2024/7/10 17:43 |
|||
*/ |
|||
public class CronExpressionGeneratorUtil { |
|||
/** |
|||
* 实时(这里假设为每分钟执行一次) |
|||
*/ |
|||
private static final String UNIT_REAL_TIME = "0"; |
|||
/** |
|||
* 每天 |
|||
*/ |
|||
private static final String UNIT_DAILY = "1"; |
|||
/** |
|||
* 每周 |
|||
*/ |
|||
private static final String UNIT_WEEKLY = "2"; |
|||
/** |
|||
* 每月 |
|||
*/ |
|||
private static final String UNIT_MONTHLY = "3"; |
|||
|
|||
public static String generateCronExpression(String unit, List<Integer> datess, String time) { |
|||
int[] dates = null; |
|||
try { |
|||
dates = datess.stream().mapToInt(Integer::intValue).toArray(); |
|||
} catch (Exception e) { |
|||
} |
|||
String minutes = null; |
|||
String hours = null; |
|||
try { |
|||
String[] timeParts = time.split(":"); |
|||
minutes = timeParts[1]; |
|||
hours = timeParts[0]; |
|||
} catch (Exception e) { |
|||
} |
|||
|
|||
switch (unit) { |
|||
case UNIT_REAL_TIME: |
|||
return "0 * * * * ?"; |
|||
case UNIT_DAILY: |
|||
return String.format("0 %s %s * * ?", minutes, hours); |
|||
case UNIT_WEEKLY: |
|||
String daysOfWeek = arrayToString(dates); |
|||
return String.format("0 %s %s ? * %s", minutes, hours, daysOfWeek); |
|||
case UNIT_MONTHLY: |
|||
String daysOfMonth = arrayToString(dates); |
|||
return String.format("0 %s %s %s * ?", minutes, hours, daysOfMonth); |
|||
default: |
|||
throw new IllegalArgumentException("Invalid unit"); |
|||
} |
|||
} |
|||
|
|||
private static String arrayToString(int[] array) { |
|||
StringBuilder sb = new StringBuilder(); |
|||
for (int i = 0; i < array.length; i++) { |
|||
sb.append(array[i]); |
|||
if (i < array.length - 1) { |
|||
sb.append(","); |
|||
} |
|||
} |
|||
return sb.toString(); |
|||
} |
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
package com.bfd.crawl.handlerdata.util; |
|||
|
|||
import java.util.LinkedHashMap; |
|||
import java.util.Set; |
|||
import java.util.concurrent.LinkedBlockingDeque; |
|||
|
|||
/** |
|||
* @author:jinming |
|||
* @className:QueueUtil |
|||
* @version:1.0 |
|||
* @description: |
|||
* @Date:2023/7/13 15:00 |
|||
*/ |
|||
public class QueueUtil { |
|||
|
|||
public static LinkedBlockingDeque<String> taskQueue = new LinkedBlockingDeque<String>(); |
|||
|
|||
public static LinkedBlockingDeque<String> sendQueue = new LinkedBlockingDeque<String>(); |
|||
} |
|||
@ -0,0 +1,94 @@ |
|||
package com.bfd.crawl.handlerdata.util; |
|||
|
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import java.security.MessageDigest; |
|||
import java.util.HashSet; |
|||
import java.util.Set; |
|||
import java.util.regex.Matcher; |
|||
import java.util.regex.Pattern; |
|||
|
|||
/** |
|||
* @author jinming |
|||
* @version 1.0 |
|||
* @className StringUtile |
|||
* @Date 2022/1/21 11:46 |
|||
*/ |
|||
@Slf4j |
|||
public class StringUtil { |
|||
public static boolean hasValue(String str) { |
|||
return str != null && !"".equals(str.trim()); |
|||
} |
|||
|
|||
public static String getRegexGroup(String regex, String str, int id) { |
|||
String resultStr = ""; |
|||
if (hasValue(str)) { |
|||
Pattern p = Pattern.compile(regex); |
|||
Matcher m = p.matcher(str); |
|||
if (m.find()) { |
|||
resultStr = m.group(id); |
|||
} |
|||
} |
|||
|
|||
if ("".equals(resultStr)) { |
|||
} |
|||
|
|||
return resultStr; |
|||
} |
|||
|
|||
public static Set<String> getEmailAddress(String message) { |
|||
Set<String> emailList = new HashSet<>(); |
|||
Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+"); |
|||
Matcher m = pattern.matcher(message); |
|||
while (m.find()) { |
|||
emailList.add(m.group(0)); |
|||
} |
|||
return emailList; |
|||
} |
|||
public static String getMd5(String string) { |
|||
try { |
|||
MessageDigest md5 = MessageDigest.getInstance("MD5"); |
|||
byte[] bs = md5.digest(string.getBytes("UTF-8")); |
|||
StringBuilder sb = new StringBuilder(40); |
|||
for (byte x : bs) { |
|||
if ((x & 0xff) >> 4 == 0) { |
|||
sb.append("0").append(Integer.toHexString(x & 0xff)); |
|||
} else { |
|||
sb.append(Integer.toHexString(x & 0xff)); |
|||
} |
|||
} |
|||
return sb.toString(); |
|||
} catch (Exception e) { |
|||
//LOG.error("获取md5异常", e); |
|||
return "nceaform" + System.currentTimeMillis(); |
|||
} |
|||
} |
|||
|
|||
public static String removeAllHtmlTags(String str) { |
|||
return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : ""; |
|||
} |
|||
|
|||
public static String getRegexGroup(Pattern regex, String str, int id) { |
|||
String resultStr = ""; |
|||
if (hasValue(str)) { |
|||
Matcher m = regex.matcher(str); |
|||
if (m.find()) { |
|||
resultStr = m.group(id); |
|||
} |
|||
} |
|||
|
|||
if ("".equals(resultStr)) { |
|||
log.error(regex + " parser error!"); |
|||
} |
|||
|
|||
return resultStr; |
|||
} |
|||
|
|||
public static String getStrByPattern(String str, String regex) { |
|||
Pattern pattern = Pattern.compile(regex); |
|||
Matcher m = pattern.matcher(str); |
|||
return m.find() ? m.group(0) : ""; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,59 @@ |
|||
server: |
|||
port: 7080 |
|||
spring: |
|||
application: |
|||
name: handlerData |
|||
boot: |
|||
admin: |
|||
client: |
|||
health: |
|||
timeout: 10s |
|||
url: http://172.16.12.55:8001 |
|||
instance: |
|||
service-base-url: http://10.10.144.49:8080 |
|||
kafka: |
|||
bootstrap-servers: 172.26.28.30:9092 |
|||
producer: |
|||
retries: 3 |
|||
acks: all |
|||
batch-size: 4096 |
|||
buffer-memory: 102476800 |
|||
key-serializer: org.apache.kafka.common.serialization.StringSerializer |
|||
value-serializer: org.apache.kafka.common.serialization.StringSerializer |
|||
redis: |
|||
host: 172.18.1.101 |
|||
port: 6379 |
|||
timeout: 10000 |
|||
database: 5 |
|||
jedis: |
|||
pool: |
|||
max-active: 8 # 连接池最大连接数(使用负值表示没有限制) |
|||
max-wait: 800 # 连接池最大阻塞等待时间(使用负值表示没有限制) |
|||
|
|||
management: |
|||
endpoints: |
|||
web: |
|||
exposure: |
|||
include: "*" |
|||
endpoint: |
|||
health: |
|||
show-details: always |
|||
health: |
|||
elasticsearch: |
|||
enabled: false |
|||
|
|||
logging: |
|||
file: |
|||
path: ./logs |
|||
level: |
|||
de.codecentric.boot.admin.client: DEBUG |
|||
zookeeper: |
|||
connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181 |
|||
publish-node: /analyze |
|||
|
|||
thread: |
|||
handler: 5 |
|||
send: 50 |
|||
sleep: 10 |
|||
send: |
|||
topic: analyze123213 |
|||
@ -0,0 +1,39 @@ |
|||
<configuration> |
|||
<!-- 属性文件:在properties文件中找到对应的配置项 --> |
|||
<springProperty scope="context" name="logging.file.path" source="logging.file.path"/> |
|||
<springProperty scope="context" name="logging.level" source="logging.level"/> |
|||
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 --> |
|||
<appender name="STDOUT" |
|||
class="ch.qos.logback.core.ConsoleAppender"> |
|||
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> |
|||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern> |
|||
</encoder> |
|||
</appender> |
|||
|
|||
<appender name="GLMAPPER-LOGGERONE" |
|||
class="ch.qos.logback.core.rolling.RollingFileAppender"> |
|||
<append>true</append> |
|||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"> |
|||
<level>${logging.level}</level> |
|||
</filter> |
|||
<file> |
|||
${logging.file.path}/handler-data.log |
|||
</file> |
|||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> |
|||
<FileNamePattern>${logging.file.path}/handler-data.log.%d{yyyy-MM-dd}</FileNamePattern> |
|||
<MaxHistory>3</MaxHistory> |
|||
</rollingPolicy> |
|||
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> |
|||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern> |
|||
<charset>UTF-8</charset> |
|||
</encoder> |
|||
</appender> |
|||
|
|||
|
|||
<root level="info"> |
|||
|
|||
<appender-ref ref="GLMAPPER-LOGGERONE"/> |
|||
<appender-ref ref="STDOUT"/> |
|||
</root> |
|||
<logger name="de.codecentric.boot.admin.client" level="TRACE"/> |
|||
</configuration> |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue