From f4b9fae646f8d6c2c7e857892b0c965c6c611cda Mon Sep 17 00:00:00 2001 From: 55007 <55007@maojian> Date: Tue, 7 Jan 2025 18:04:18 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=87=E9=9B=86=E5=B9=B3=E5=8F=B0=E5=BA=94?= =?UTF-8?q?=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 33 ++ README.md | 1 + pom.xml | 124 ++++++ .../crawl/handlerdata/HandlerDataApplication.java | 30 ++ .../com/bfd/crawl/handlerdata/bean/Constants.java | 54 +++ .../com/bfd/crawl/handlerdata/bean/ResponsePo.java | 59 +++ .../config/AsyncThreadConfiguration.java | 48 +++ .../crawl/handlerdata/config/SchedulerConfig.java | 25 ++ .../crawl/handlerdata/config/ZookeeperConfig.java | 25 ++ .../controller/HandlerDataController.java | 58 +++ .../bfd/crawl/handlerdata/enums/ResponseCode.java | 32 ++ .../exception/GlobalExceptionHandler.java | 31 ++ .../handlerdata/service/ElasticSearchService.java | 424 +++++++++++++++++++++ .../crawl/handlerdata/service/HandlerService.java | 115 ++++++ .../bfd/crawl/handlerdata/service/SendService.java | 46 +++ .../crawl/handlerdata/service/StartServcie.java | 67 ++++ .../bfd/crawl/handlerdata/service/TaskService.java | 42 ++ .../handlerdata/service/ZookeeperNodeMonitor.java | 64 ++++ .../util/CronExpressionGeneratorUtil.java | 73 ++++ .../com/bfd/crawl/handlerdata/util/QueueUtil.java | 19 + .../com/bfd/crawl/handlerdata/util/StringUtil.java | 94 +++++ src/main/resources/application.yml | 59 +++ src/main/resources/logback-spring.xml | 39 ++ 23 files changed, 1562 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/com/bfd/crawl/handlerdata/HandlerDataApplication.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/bean/Constants.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/bean/ResponsePo.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/config/AsyncThreadConfiguration.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/config/SchedulerConfig.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/config/ZookeeperConfig.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/controller/HandlerDataController.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/enums/ResponseCode.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/exception/GlobalExceptionHandler.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/service/ElasticSearchService.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/service/HandlerService.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/service/SendService.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/service/StartServcie.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/service/TaskService.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/service/ZookeeperNodeMonitor.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/util/CronExpressionGeneratorUtil.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/util/QueueUtil.java create mode 100644 src/main/java/com/bfd/crawl/handlerdata/util/StringUtil.java create mode 100644 src/main/resources/application.yml create mode 100644 src/main/resources/logback-spring.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..abd343a --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +采集平台应用 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..97acfde --- /dev/null +++ b/pom.xml @@ -0,0 +1,124 @@ + + + 4.0.0 + com.bfd.crawl + handlerData + 0.0.1-SNAPSHOT + handlerData + handlerData + + 1.8 + UTF-8 + UTF-8 + 2.2.4.RELEASE + + + + de.codecentric + spring-boot-admin-client + 2.2.4 + + + com.google.code.gson + gson + 2.8.8 + + + + + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-web + + + + com.alibaba + fastjson + 1.2.0 + + + + com.squareup.okhttp3 + okhttp + 3.9.1 + + + + + + + + + org.projectlombok + lombok + true + + + + + + + + org.springframework.kafka + spring-kafka-test + test + + + org.apache.kafka + kafka-clients + 2.3.1 + + + com.bfd.util + pauseTool + 1.0 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + handlerData-0.0.1-SNAPSHOT + + + org.springframework.boot + spring-boot-maven-plugin + 2.4.1 + + true + + + + + repackage + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + diff --git a/src/main/java/com/bfd/crawl/handlerdata/HandlerDataApplication.java b/src/main/java/com/bfd/crawl/handlerdata/HandlerDataApplication.java new file mode 100644 index 0000000..1e37371 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/HandlerDataApplication.java @@ -0,0 +1,30 @@ +package com.bfd.crawl.handlerdata; + +import de.codecentric.boot.admin.client.registration.Application; +import de.codecentric.boot.admin.client.registration.ApplicationRegistrator; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.stereotype.Component; +/** + * @author:jinming + * @className:HandlerDataApplication + * @version:1.0 + * @description: + * @Date:2023/7/31 17:14 + */ +@SpringBootApplication +@EnableAsync +@EnableScheduling +public class HandlerDataApplication { + + public static void main(String[] args) { + SpringApplication.run(HandlerDataApplication.class, args); + + } + +} diff --git a/src/main/java/com/bfd/crawl/handlerdata/bean/Constants.java b/src/main/java/com/bfd/crawl/handlerdata/bean/Constants.java new file mode 100644 index 0000000..2c1c628 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/bean/Constants.java @@ -0,0 +1,54 @@ +package com.bfd.crawl.handlerdata.bean; + +/** + * @author:jinming + * @className:Constants + * @version:1.0 + * @description: + * @Date:2023/7/14 10:41 + */ +public class Constants { + /** + * 查询数据时间 + */ + public static String TIME = "pubTime"; + + /** + * 查询数据开始时间 + */ + public static String START_TIME = "gte"; + + /** + * 查询数据结束时间 + */ + public static String END_TIME = "lte"; + + /** + * 查询排除词 + */ + public static String OR = "or"; + + /** + * 查询数据开始时间 + */ + public static String TITLE = "title"; + /** + * 查询数据开始时间 + */ + public static String AUTHOR = "author"; + /** + * 查询数据开始时间 + */ + public static String CONTENT = "content"; + /** + * 查询数据开始时间 + */ + public static String TYPE = "type"; + + /** + * 查询数据开始时间 + */ + public static final String INCLUDE = "include"; + public static final String EXCLUSION = "exclusion"; + public static final String ATTACHMENTFILTER = "attachmentFilter"; +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/bean/ResponsePo.java b/src/main/java/com/bfd/crawl/handlerdata/bean/ResponsePo.java new file mode 100644 index 0000000..b0ec9df --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/bean/ResponsePo.java @@ -0,0 +1,59 @@ +package com.bfd.crawl.handlerdata.bean; + + +import com.bfd.crawl.handlerdata.enums.ResponseCode; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author:jinming + * @className:ResponsePo + * @version:1.0 + * @description: + * @Date:2023/4/3 17:23 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ResponsePo { + /** + * 响应码 + */ + private int code; + + /** + * 正常放 返回数据 的JSON串 + */ + private Object data; + + /** + * 提示消息 + */ + private String message; + + public static ResponsePo success() { + return setStatus(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getMessage()); + } + + public static ResponsePo error() { + return setStatus(ResponseCode.FAILURE.getCode(), ResponseCode.FAILURE.getMessage()); + } + + public static ResponsePo setStatus(int code, String message) { + ResponsePo resultBean = new ResponsePo(); + resultBean.code = code; + resultBean.message = message; + return resultBean; + } + public ResponsePo(int code, String message) { + this.code = code; + this.message = message; + this.data = data; + } + public ResponsePo(ResponseCode responseCode){ + this.code = responseCode.getCode(); + this.message = responseCode.getMessage(); + this.data = data; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/config/AsyncThreadConfiguration.java b/src/main/java/com/bfd/crawl/handlerdata/config/AsyncThreadConfiguration.java new file mode 100644 index 0000000..a9adabc --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/config/AsyncThreadConfiguration.java @@ -0,0 +1,48 @@ +package com.bfd.crawl.handlerdata.config; + + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +/** + * @author jinming + * @version 1.0 + * @className AsyncThreadConfiguration + * @Date 2022/2/17 18:37 + */ +@Configuration +@EnableAsync +public class AsyncThreadConfiguration { + @Bean(name = "asyncExecutor") + public Executor asyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(500); + // 并发线程的数量限制为2 + executor.setMaxPoolSize(500); + // 线程队列 + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("handlerData-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(true); + return executor; + } + @Bean(name = "sendExecutor") + public Executor sendExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(500); + // 并发线程的数量限制为2 + executor.setMaxPoolSize(500); + // 线程队列 + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("sendData-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(true); + return executor; + } +} diff --git a/src/main/java/com/bfd/crawl/handlerdata/config/SchedulerConfig.java b/src/main/java/com/bfd/crawl/handlerdata/config/SchedulerConfig.java new file mode 100644 index 0000000..d465009 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/config/SchedulerConfig.java @@ -0,0 +1,25 @@ +package com.bfd.crawl.handlerdata.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +/** + * @author:jinming + * @className:SchedulerConfig + * @version:1.0 + * @description: + * @Date:2024/7/10 17:21 + */ +@Configuration +public class SchedulerConfig { + + @Bean + public ThreadPoolTaskScheduler taskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(300); + scheduler.setThreadNamePrefix("task-"); + scheduler.initialize(); + return scheduler; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/config/ZookeeperConfig.java b/src/main/java/com/bfd/crawl/handlerdata/config/ZookeeperConfig.java new file mode 100644 index 0000000..f26731e --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/config/ZookeeperConfig.java @@ -0,0 +1,25 @@ +package com.bfd.crawl.handlerdata.config; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author jian.mao + * @date 2024年4月16日 + * @description + */ +@Configuration +public class ZookeeperConfig { + @Value("${zookeeper.connection-string}") + private String connectionString; + @Bean + public CuratorFramework curatorFramework() { + CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3)); + curatorFramework.start(); + return curatorFramework; + } +} diff --git a/src/main/java/com/bfd/crawl/handlerdata/controller/HandlerDataController.java b/src/main/java/com/bfd/crawl/handlerdata/controller/HandlerDataController.java new file mode 100644 index 0000000..d4732ca --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/controller/HandlerDataController.java @@ -0,0 +1,58 @@ +package com.bfd.crawl.handlerdata.controller; + +import com.alibaba.fastjson.JSON; +import com.bfd.crawl.handlerdata.bean.ResponsePo; +import com.bfd.crawl.handlerdata.enums.ResponseCode; +import com.bfd.crawl.handlerdata.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author:jinming + * @className:HandlerDataController + * @version:1.0 + * @description: 处理接口 + * @Date:2023/7/13 14:25 + */ + +@RestController +@RequestMapping("/handlerdata") +@Slf4j +public class HandlerDataController { + + @PostMapping("/handler") + public ResponsePo documentFeedback(@RequestBody String dataJson) { + String trace = "trace"; + ResponsePo responsePo = ResponsePo.success(); + try { + Map parse = (Map) JSON.parse(dataJson); + log.info("新增任务:" + dataJson); + if (parse.containsKey(trace) && (Boolean) parse.get(trace) == true) { + log.info("测试流程,插入队首"); + QueueUtil.taskQueue.putFirst(dataJson); + } else { + QueueUtil.taskQueue.put(dataJson); + } + } catch (InterruptedException e) { + e.printStackTrace(); + log.error("任务添加队列异常" + e.getMessage()); + responsePo.setCode(ResponseCode.INTERNAL_SERVER_ERROR.getCode()); + responsePo.setMessage(ResponseCode.INTERNAL_SERVER_ERROR.getMessage()); + return responsePo; + } catch (Exception e) { + log.error("请求格式发生异常" + e.getMessage()); + responsePo.setCode(ResponseCode.FAILURE.getCode()); + responsePo.setMessage(ResponseCode.FAILURE.getMessage()); + return responsePo; + } + return responsePo; + } + + +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/enums/ResponseCode.java b/src/main/java/com/bfd/crawl/handlerdata/enums/ResponseCode.java new file mode 100644 index 0000000..f4db196 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/enums/ResponseCode.java @@ -0,0 +1,32 @@ +package com.bfd.crawl.handlerdata.enums; + +/** + * @author:jinming + * @className:ResponseCodeEnum + * @version:1.0 + * @description:响应结果码枚举类 + * @Date:2023/2/28 11:40 + */ +public enum ResponseCode { + //返回结果码枚举类 + SUCCESS(200, "操作成功"), + FAILURE(400, "参数错误"), + INTERNAL_SERVER_ERROR(500, "服务器内部错误"), + TYPE_NOT_SUPPORT(601,"文件类型不支持"); + + private int code; + private String message; + + ResponseCode(int code, String message) { + this.code = code; + this.message = message; + } + + public int getCode() { + return code; + } + + public String getMessage() { + return message; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/exception/GlobalExceptionHandler.java b/src/main/java/com/bfd/crawl/handlerdata/exception/GlobalExceptionHandler.java new file mode 100644 index 0000000..c361bf9 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/exception/GlobalExceptionHandler.java @@ -0,0 +1,31 @@ +package com.bfd.crawl.handlerdata.exception; + + +import com.bfd.crawl.handlerdata.bean.ResponsePo; +import com.bfd.crawl.handlerdata.enums.ResponseCode; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +/** + * @author:jinming + * @className:GlobalExceptionHandler + * @version:1.0 + * @description: 异常处理类 + * @Date:2023/2/28 16:29 + */ +@RestControllerAdvice +public class GlobalExceptionHandler { + + @ExceptionHandler(value = {IllegalArgumentException.class}) + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ResponsePo handleBadRequest(Exception ex) { + return new ResponsePo(ResponseCode.FAILURE.getCode(), ex.getMessage()); + } + @ExceptionHandler(value = {Exception.class}) + @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) + public ResponsePo handleException(Exception ex) { + return new ResponsePo(ResponseCode.INTERNAL_SERVER_ERROR.getCode(), ex.getMessage()); + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/service/ElasticSearchService.java b/src/main/java/com/bfd/crawl/handlerdata/service/ElasticSearchService.java new file mode 100644 index 0000000..991a168 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/service/ElasticSearchService.java @@ -0,0 +1,424 @@ +package com.bfd.crawl.handlerdata.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bfd.crawl.handlerdata.bean.Constants; +import com.bfd.crawl.handlerdata.util.QueueUtil; +import com.bfd.crawl.handlerdata.util.StringUtil; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import lombok.extern.slf4j.Slf4j; +import okhttp3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.io.IOException; +import java.util.*; + +/** + * @author:jinming + * @className:ElasticSearchService + * @version:1.0 + * @description: + * @Date:2024/7/10 18:04 + */ +@Service +@Slf4j +public class ElasticSearchService { + @Value("${thread.sleep}") + private int sleepTimes; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + public void performQuery(Map parse) throws Exception { + String errorMessage = ""; + Gson gson = new GsonBuilder().serializeNulls().create(); + try { + + OkHttpClient client = new OkHttpClient().newBuilder() + .build(); + int id = (int) parse.get("id"); + int scenesId = (int) parse.get("scenes_id"); + Map admin = (Map) parse.get("input"); + List taskIds = (List) admin.get("taskId"); + String indexs = admin.get("Index").toString(); + String host = (String) admin.get("host"); + String port = (String) admin.get("port"); + String requestUrl = "http://"; + requestUrl = requestUrl.concat("172.26.250.44").concat(":").concat("9200").concat("/"); + String scoreUrl = requestUrl.concat("_search/scroll"); + requestUrl = requestUrl.concat("cl_special_1.0_").concat(indexs).concat("/_search?scroll=5m"); + int index = 1; + int dataIndex = 1; + int total = 0; + String scrollId = ""; + log.info("任务ID:" + id + "的查询连接为:" + requestUrl); + //总数据量统计 + int allDataTotal = 0; + for (String taskId : taskIds) { + String queryQl = makeQueryQl(admin, index, String.valueOf(scenesId), taskId); + log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的查询条件为:" + queryQl); + allDataTotal += getDataTotal(requestUrl, queryQl); + } + log.info("任务ID:" + id + "的总数据量为:" + allDataTotal + "共需要查询" + Math.ceil(allDataTotal / 500f) + "次"); + for (String taskId : taskIds) { + index = 1; + total = 0; + try { + String queryQl = null; + try { + queryQl = makeQueryQl(admin, index, String.valueOf(scenesId), taskId); + } catch (Exception e) { + e.printStackTrace(); + errorMessage = "查询语句组装失败"; + } + if (total == 0) { + total = getDataTotal(requestUrl, queryQl); + } + log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的查询条件为:" + queryQl); + log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的总数据量为:" + total + "共需要查询" + Math.ceil(total / 500f) + "次"); + MediaType mediaType = MediaType.parse("application/json"); + RequestBody body = RequestBody.create(mediaType, queryQl); + Request firstRequest = new Request.Builder() + .url(requestUrl) + .method("POST", body) + .addHeader("Authorization", "Basic ZWxhc3RpYzpiYWlmZW5kaWFuMTIz") + .addHeader("Content-Type", "application/json") + .build(); + String firstResultString = null; + try { + Response firstResponse = client.newCall(firstRequest).execute(); + firstResultString = firstResponse.body().string(); + } catch (IOException e) { + e.printStackTrace(); + errorMessage = "ES 链接失败"; + } + Map resultParse = (Map) JSON.parse(firstResultString); + scrollId = (String) resultParse.get("_scroll_id"); + Map hitsMap = (Map) resultParse.get("hits"); + List hits = (List) hitsMap.get("hits"); + for (Map hitMap : hits) { + Map source = (Map) hitMap.get("_source"); + source.put("subjectId", indexs); + Map result = new HashMap(32); + Map resultData = new HashMap(32); + //获取输出字段 + Map output = (Map) parse.get("output"); + for (String key : output.keySet()) { + if (source.containsKey(key)) { + resultData.put(key, source.get(key)); + } + } + if (dataIndex == allDataTotal) { + resultData.put("isLast", 1); + Thread.sleep(sleepTimes * 1000); + } + result.put("results", JSONObject.toJSONString(resultData)); + result.put("status", 1); + parse.put("result", result); + String message = gson.toJson(parse); + if (!StringUtil.hasValue(errorMessage)) { + QueueUtil.sendQueue.put(message); + } + dataIndex++; + } + log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的第" + index + "次查询"); + index++; + do { + Map newQlMap = new HashMap(32); + newQlMap.put("scroll", "5m"); + newQlMap.put("scroll_id", scrollId); + MediaType mediaTypeScroll = MediaType.parse("application/json"); + RequestBody bodyScroll = RequestBody.create(mediaTypeScroll, JSON.toJSONString(newQlMap)); + Request scrollRequest = new Request.Builder() + .url(scoreUrl) + .method("POST", bodyScroll).addHeader("Authorization", "Basic ZWxhc3RpYzpiYWlmZW5kaWFuMTIz") + .addHeader("Content-Type", "application/json") + .build(); + String scrollResultString = null; + try { + Response scrollResponse = client.newCall(scrollRequest).execute(); + scrollResultString = scrollResponse.body().string(); + } catch (IOException e) { + e.printStackTrace(); + errorMessage = "ES 链接失败"; + } + Map scrollResultParse = (Map) JSON.parse(scrollResultString); + + Map scrollHitsMap = (Map) scrollResultParse.get("hits"); + hits = (List) scrollHitsMap.get("hits"); + for (Map hitMap : hits) { + Map source = (Map) hitMap.get("_source"); + source.put("subjectId", indexs); + Map result = new HashMap(32); + Map resultData = new HashMap(32); + //获取输出字段 + Map output = (Map) parse.get("output"); + for (String key : output.keySet()) { + if (source.containsKey(key)) { + resultData.put(key, source.get(key)); + } + } + if (dataIndex == allDataTotal) { + resultData.put("isLast", 1); + Thread.sleep(sleepTimes * 1000); + } + result.put("status", 1); + result.put("message", errorMessage); + result.put("results", JSONObject.toJSONString(resultData)); + parse.put("result", result); + String message = gson.toJson(parse); + if (!StringUtil.hasValue(errorMessage)) { + QueueUtil.sendQueue.put(message); + } + dataIndex++; + } + log.info("任务ID:" + id + "采集平台任务ID:" + taskId + "的第" + index + "次查询"); + index++; + } while (hits.size() != 0 && hits.size() == 500); + log.info("任务ID:" + id + "查询结束"); + } catch (Exception e) { + e.printStackTrace(); + } + } + stringRedisTemplate.opsForHash().put("selection", String.valueOf(scenesId), String.valueOf(System.currentTimeMillis())); + } catch (Throwable e) { + log.error("处理程序发生异常:" + e.getMessage()); + e.printStackTrace(); + errorMessage = "未知异常"; + Map result = new HashMap(32); + result.put("status", 2); + result.put("results", ""); + result.put("message", errorMessage); + parse.put("result", result); + String message = gson.toJson(parse); + try { + QueueUtil.sendQueue.put(message); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + if (StringUtil.hasValue(errorMessage)) { + Map result = new HashMap(32); + result.put("status", 2); + result.put("results", ""); + result.put("message", errorMessage); + parse.put("result", result); + String message = gson.toJson(parse); + try { + QueueUtil.sendQueue.put(message); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + } + + private int getDataTotal(String requestUrl, String queryQl) { + int total = 0; + try { + OkHttpClient client = new OkHttpClient().newBuilder() + .build(); + MediaType mediaType = MediaType.parse("application/json"); + RequestBody body = RequestBody.create(mediaType, queryQl); + Request firstRequest = new Request.Builder() + .url(requestUrl) + .method("POST", body) + .addHeader("Authorization", "Basic ZWxhc3RpYzpiYWlmZW5kaWFuMTIz") + .addHeader("Content-Type", "application/json") + .build(); + Response firstResponse = client.newCall(firstRequest).execute(); + String firstResultString = firstResponse.body().string(); + Map resultParse = (Map) JSON.parse(firstResultString); + Map hitsMap = (Map) resultParse.get("hits"); + Map totalMap = (Map) hitsMap.get("total"); + total = (int) totalMap.get("value"); + } catch (Exception e) { + e.printStackTrace(); + } + return total; + } + + public String makeQueryQl(Map admin, int pageIndex, String scenesId, String... taskId) { + int dataRange = 1; + try { + dataRange = (int) admin.get("dataRange"); + } catch (Exception e) { + } + int size = 500; + int from = (pageIndex - 1) * size; + Map queryQl = new HashMap(32); + queryQl.put("from", from); + queryQl.put("size", size); + Map queryMap = new HashMap(32); + Map boolMap = new HashMap(32); + List mustList = new ArrayList<>(); + List shouldList = new ArrayList<>(); + List mustNotList = new ArrayList<>(); + if (taskId.length > 0) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + match.put("taskId", taskId[0]); + matchMap.put("match", match); + mustList.add(matchMap); + } + if (admin.containsKey(Constants.TIME)) { + Map timeCondition = new HashMap(32); + Map time = (Map) admin.get(Constants.TIME); + Long startTime = (Long) time.get(Constants.START_TIME); + Long endTime = (Long) time.get(Constants.END_TIME); + Map rangeMap = new HashMap(32); + Map pubTimeMap = new HashMap(32); + pubTimeMap.put("gte", startTime); + pubTimeMap.put("lte", endTime); + rangeMap.put("pubTime", pubTimeMap); + timeCondition.put("range", rangeMap); + mustList.add(timeCondition); + } + if (dataRange == 0) { + long nowtime = System.currentTimeMillis(); + long startTime = 0L; + + //取上次查询的抓取时间来进行时间的过滤,只针对增量逻辑 + boolean hasTime = stringRedisTemplate.opsForHash().hasKey("selection", scenesId); + if (hasTime) { + long lastQueryTime = Long.valueOf(stringRedisTemplate.opsForHash().get("selection", scenesId).toString()); + boolean isNew = nowtime - lastQueryTime > 10 * 60 * 1000; + if (isNew) { + startTime = lastQueryTime; + } + } + Map timeCondition = new HashMap(32); + Map rangeMap = new HashMap(32); + Map crawlTimeMap = new HashMap(32); + crawlTimeMap.put("gte", startTime); + rangeMap.put("crawlTime", crawlTimeMap); + timeCondition.put("range", rangeMap); + mustList.add(timeCondition); + //操作后放入当前时间,作为下次查询的条件 + } + + if (admin.containsKey(Constants.OR)) { + List>> ors = (List>>) admin.get(Constants.OR); + int loopIndex = 0; + + for (List> or : ors) { + + for (Map stringStringMap : or) { + String type = stringStringMap.get(Constants.TYPE); + String title = stringStringMap.get(Constants.TITLE); + String author = stringStringMap.get(Constants.AUTHOR); + String content = stringStringMap.get(Constants.CONTENT); + switch (type) { + case Constants.INCLUDE: + if (StringUtil.hasValue(title)) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + Map contentMap = new HashMap(32); + contentMap.put("query", title); + match.put("title", contentMap); + matchMap.put("match_phrase", match); + if (loopIndex > 0) { + shouldList.add(matchMap); + } else { + mustList.add(matchMap); + } + + } + if (StringUtil.hasValue(content)) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + Map contentMap = new HashMap(32); + contentMap.put("query", content); + contentMap.put("boost", 5); + match.put("content", contentMap); + matchMap.put("match_phrase", match); + if (loopIndex > 0) { + shouldList.add(matchMap); + } else { + mustList.add(matchMap); + } + } + if (StringUtil.hasValue(author)) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + Map contentMap = new HashMap(32); + contentMap.put("query", author); + contentMap.put("boost", 5); + match.put("author", contentMap); + matchMap.put("match_phrase", match); + if (loopIndex > 0) { + shouldList.add(matchMap); + } else { + mustList.add(matchMap); + } + } + break; + case Constants.EXCLUSION: + if (StringUtil.hasValue(title)) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + match.put("title", title); + matchMap.put("match", match); + mustNotList.add(matchMap); + } + if (StringUtil.hasValue(content)) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + match.put("content", content); + matchMap.put("match", match); + mustNotList.add(matchMap); + } + if (StringUtil.hasValue(author)) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + match.put("author", author); + matchMap.put("match", match); + mustNotList.add(matchMap); + } + break; + default: + break; + } + } + loopIndex++; + } + } + if (admin.containsKey(Constants.ATTACHMENTFILTER)) { + Map attachmentFilter = (Map) admin.get(Constants.ATTACHMENTFILTER); + Set attachmentFilterKeys = attachmentFilter.keySet(); + for (String attachmentFilterKey : attachmentFilterKeys) { + boolean aBoolean = attachmentFilter.get(attachmentFilterKey); + if (aBoolean) { + Map matchMap = new HashMap(32); + Map match = new HashMap(32); + match.put(attachmentFilterKey, 1); + matchMap.put("term", match); + mustList.add(matchMap); + } + } + } + + if (shouldList.size() > 0) { + boolMap.put("should", shouldList); + } + if (mustList.size() > 0) { + boolMap.put("must", mustList); + } + if (mustNotList.size() > 0) { + boolMap.put("must_not", mustNotList); + } + if (boolMap.isEmpty()) { + Map matchAll = new HashMap(32); + queryMap.put("match_all", matchAll); + } else { + queryMap.put("bool", boolMap); + } + queryQl.put("query", queryMap); + return JSON.toJSONString(queryQl); + } + +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/service/HandlerService.java b/src/main/java/com/bfd/crawl/handlerdata/service/HandlerService.java new file mode 100644 index 0000000..a0b8e14 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/service/HandlerService.java @@ -0,0 +1,115 @@ +package com.bfd.crawl.handlerdata.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bfd.crawl.handlerdata.bean.Constants; +import com.bfd.crawl.handlerdata.util.CronExpressionGeneratorUtil; +import com.bfd.crawl.handlerdata.util.QueueUtil; +import com.bfd.crawl.handlerdata.util.StringUtil; +import com.bfd.util.PauseTool; +import lombok.extern.slf4j.Slf4j; +import okhttp3.*; +import org.apache.catalina.valves.CrawlerSessionManagerValve; +import org.checkerframework.checker.units.qual.A; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +import java.util.*; + +/** + * @author:jinming + * @className:HandlerService + * @version:1.0 + * @description: + * @Date:2023/7/13 15:03 + */ +@Service +@Slf4j +public class HandlerService { + + @Autowired + private ElasticSearchService elasticSearchService; + + @Resource + private StringRedisTemplate stringRedisTemplate; + @Autowired + private TaskService taskService; + + @Async("asyncExecutor") + void doSearch() { + + while (true) { + if (QueueUtil.taskQueue.size() > 0) { + String dataJson = null; + try { + dataJson = QueueUtil.taskQueue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + continue; + } + Map parse = (Map) JSON.parse(dataJson); + int scenesId = (int) parse.get("scenes_id"); + int version = (int) parse.get("version"); + String pauseKey = scenesId + "_" + version; + if (!PauseTool.CACHE.containsKey(pauseKey)) { + log.info("流程:{}的版本:{}已失效,任务跳过", scenesId, version); + continue; + } + int id = (int) parse.get("id"); + Map admin = (Map) parse.get("input"); + int dataUpdate = 0; + try { + dataUpdate = (int) admin.get("dataUpdate"); + } catch (Exception e) { + e.printStackTrace(); + } + try { + elasticSearchService.performQuery(parse); + if (dataUpdate == 0) { + continue; + } + Map cycle = (Map) admin.get("cycle"); + String unit = (String) cycle.get("unit"); + String time = null; + try { + time = (String) cycle.get("time"); + } catch (Exception e) { + + } + List date = null; + try { + date = (List) cycle.get("date"); + } catch (Exception e) { + + } + String cronExpression = CronExpressionGeneratorUtil.generateCronExpression(unit, date, time); + taskService.scheduleTask(String.valueOf(scenesId), () -> { + try { + elasticSearchService.performQuery(parse); + } catch (Exception e) { + e.printStackTrace(); + } + }, cronExpression); + + stringRedisTemplate.opsForHash().put("selectionTask", String.valueOf(scenesId), dataJson); + log.info("流程{}的定时任务已经启动,对应的cron表达式为:{}", scenesId, cronExpression); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + log.info("任务队列为空,休眠3秒"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/service/SendService.java b/src/main/java/com/bfd/crawl/handlerdata/service/SendService.java new file mode 100644 index 0000000..3b0ba66 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/service/SendService.java @@ -0,0 +1,46 @@ +package com.bfd.crawl.handlerdata.service; + +import com.bfd.crawl.handlerdata.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +/** + * @author:jinming + * @className:SendService + * @version:1.0 + * @description: + * @Date:2023/7/31 17:53 + */ +@Slf4j +@Service +public class SendService { + @Value("${send.topic}") + private String topic; + @Autowired + private KafkaTemplate kafkaTemplate; + + @Async("sendExecutor") + void sendToKafka() { + while (true) { + if (QueueUtil.sendQueue.size() > 0) { + try { + String message = QueueUtil.sendQueue.take(); + kafkaTemplate.send(topic,message); + } catch (Exception e) { + e.printStackTrace(); + } + }else { + log.info("任务队列为空,休眠3秒"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } +} diff --git a/src/main/java/com/bfd/crawl/handlerdata/service/StartServcie.java b/src/main/java/com/bfd/crawl/handlerdata/service/StartServcie.java new file mode 100644 index 0000000..5cd17b5 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/service/StartServcie.java @@ -0,0 +1,67 @@ +package com.bfd.crawl.handlerdata.service; + +import com.bfd.crawl.handlerdata.util.QueueUtil; +import com.bfd.util.PauseTool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Set; + +/** + * @author:jinming + * @className:StartServcie + * @version:1.0 + * @description: + * @Date:2023/7/31 17:14 + */ +@Service +@Slf4j +public class StartServcie implements ApplicationRunner { + @Value("${thread.handler}") + private int handlerNumber; + + @Value("${thread.send}") + private int sendNumber; + @Autowired + private HandlerService handlerService; + @Autowired + private SendService sendService; + + @Value("${zookeeper.connection-string}") + private String connectionString; + @Value("${zookeeper.publish-node}") + private String nodePath; + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Override + public void run(ApplicationArguments args) throws Exception { + PauseTool pauseTool = new PauseTool(); + pauseTool.initializeRedisCache(stringRedisTemplate); + pauseTool.setupZookeeperListener(connectionString, nodePath); + try { + Set selectionTask = stringRedisTemplate.opsForHash().keys("selectionTask"); + for (Object o : selectionTask) { + String dataJson = (String) stringRedisTemplate.opsForHash().get("selectionTask", o); + QueueUtil.taskQueue.put(dataJson); + } + log.info("已从缓存加载" + selectionTask.size() + "条任务"); + } catch (Exception e) { + } + for (int i = 0; i < handlerNumber; i++) { + log.info("处理服务线程" + i + "已启动 "); + handlerService.doSearch(); + } + for (int i = 0; i < sendNumber; i++) { + log.info("发送服务线程" + i + "已启动 "); + sendService.sendToKafka(); + } + + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/service/TaskService.java b/src/main/java/com/bfd/crawl/handlerdata/service/TaskService.java new file mode 100644 index 0000000..69df1c0 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/service/TaskService.java @@ -0,0 +1,42 @@ +package com.bfd.crawl.handlerdata.service; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; + +/** + * @author:jinming + * @className:TaskService + * @version:1.0 + * @description: + * @Date:2024/7/10 17:21 + */ +@Service + +public class TaskService { + + @Autowired + private ThreadPoolTaskScheduler taskScheduler; + + private ConcurrentHashMap> scheduledTasks = new ConcurrentHashMap<>(); + + public void scheduleTask(String taskId, Runnable task, String cronExpression) { + if (scheduledTasks.containsKey(taskId)) { + scheduledTasks.get(taskId).cancel(false); + } + + ScheduledFuture scheduledFuture = taskScheduler.schedule(task, new CronTrigger(cronExpression)); + scheduledTasks.put(taskId, scheduledFuture); + } + + public void cancelTask(String taskId) { + if (scheduledTasks.containsKey(taskId)) { + scheduledTasks.get(taskId).cancel(false); + scheduledTasks.remove(taskId); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/service/ZookeeperNodeMonitor.java b/src/main/java/com/bfd/crawl/handlerdata/service/ZookeeperNodeMonitor.java new file mode 100644 index 0000000..1dcc6dc --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/service/ZookeeperNodeMonitor.java @@ -0,0 +1,64 @@ +package com.bfd.crawl.handlerdata.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +/** + * @author jian.mao + * @date 2024年4月17日 + * @description + */ +@Component +@Slf4j +public class ZookeeperNodeMonitor { + + @Resource + private StringRedisTemplate stringRedisTemplate; + @Autowired + private CuratorFramework curatorFramework; + @Value("${zookeeper.publish-node}") + private String nodePath; + @Autowired + private TaskService taskService; + + @PostConstruct + public void init() { + try { + // 创建节点监听器 + NodeCache nodeCache = new NodeCache(curatorFramework, nodePath); + nodeCache.start(); + + // 监听节点变化 + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + byte[] data = nodeCache.getCurrentData().getData(); + String nodeData = new String(data); + log.info("Node data changed: " + nodeData); + JSONObject jsonObject = JSON.parseObject(nodeData); + int scenesId = jsonObject.getIntValue("scenes_id"); + String operation = jsonObject.getString("operation"); + String stopStatus = "stop"; + if (operation.equals(stopStatus)) { + stringRedisTemplate.opsForHash().delete("selection", String.valueOf(scenesId)); + stringRedisTemplate.opsForHash().delete("selectionTask", String.valueOf(scenesId)); + taskService.cancelTask(String.valueOf(scenesId)); + } + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/bfd/crawl/handlerdata/util/CronExpressionGeneratorUtil.java b/src/main/java/com/bfd/crawl/handlerdata/util/CronExpressionGeneratorUtil.java new file mode 100644 index 0000000..84c497d --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/util/CronExpressionGeneratorUtil.java @@ -0,0 +1,73 @@ +package com.bfd.crawl.handlerdata.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * @author:jinming + * @className:CronExpressionGeneratorUtil + * @version:1.0 + * @description: + * @Date:2024/7/10 17:43 + */ +public class CronExpressionGeneratorUtil { + /** + * 实时(这里假设为每分钟执行一次) + */ + private static final String UNIT_REAL_TIME = "0"; + /** + * 每天 + */ + private static final String UNIT_DAILY = "1"; + /** + * 每周 + */ + private static final String UNIT_WEEKLY = "2"; + /** + * 每月 + */ + private static final String UNIT_MONTHLY = "3"; + + public static String generateCronExpression(String unit, List datess, String time) { + int[] dates = null; + try { + dates = datess.stream().mapToInt(Integer::intValue).toArray(); + } catch (Exception e) { + } + String minutes = null; + String hours = null; + try { + String[] timeParts = time.split(":"); + minutes = timeParts[1]; + hours = timeParts[0]; + } catch (Exception e) { + } + + switch (unit) { + case UNIT_REAL_TIME: + return "0 * * * * ?"; + case UNIT_DAILY: + return String.format("0 %s %s * * ?", minutes, hours); + case UNIT_WEEKLY: + String daysOfWeek = arrayToString(dates); + return String.format("0 %s %s ? * %s", minutes, hours, daysOfWeek); + case UNIT_MONTHLY: + String daysOfMonth = arrayToString(dates); + return String.format("0 %s %s %s * ?", minutes, hours, daysOfMonth); + default: + throw new IllegalArgumentException("Invalid unit"); + } + } + + private static String arrayToString(int[] array) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < array.length; i++) { + sb.append(array[i]); + if (i < array.length - 1) { + sb.append(","); + } + } + return sb.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/util/QueueUtil.java b/src/main/java/com/bfd/crawl/handlerdata/util/QueueUtil.java new file mode 100644 index 0000000..413a7ed --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/util/QueueUtil.java @@ -0,0 +1,19 @@ +package com.bfd.crawl.handlerdata.util; + +import java.util.LinkedHashMap; +import java.util.Set; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * @author:jinming + * @className:QueueUtil + * @version:1.0 + * @description: + * @Date:2023/7/13 15:00 + */ +public class QueueUtil { + + public static LinkedBlockingDeque taskQueue = new LinkedBlockingDeque(); + + public static LinkedBlockingDeque sendQueue = new LinkedBlockingDeque(); +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/handlerdata/util/StringUtil.java b/src/main/java/com/bfd/crawl/handlerdata/util/StringUtil.java new file mode 100644 index 0000000..661fbf6 --- /dev/null +++ b/src/main/java/com/bfd/crawl/handlerdata/util/StringUtil.java @@ -0,0 +1,94 @@ +package com.bfd.crawl.handlerdata.util; + + +import lombok.extern.slf4j.Slf4j; + +import java.security.MessageDigest; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author jinming + * @version 1.0 + * @className StringUtile + * @Date 2022/1/21 11:46 + */ +@Slf4j +public class StringUtil { + public static boolean hasValue(String str) { + return str != null && !"".equals(str.trim()); + } + + public static String getRegexGroup(String regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Pattern p = Pattern.compile(regex); + Matcher m = p.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + } + + return resultStr; + } + + public static Set getEmailAddress(String message) { + Set emailList = new HashSet<>(); + Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+"); + Matcher m = pattern.matcher(message); + while (m.find()) { + emailList.add(m.group(0)); + } + return emailList; + } + public static String getMd5(String string) { + try { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] bs = md5.digest(string.getBytes("UTF-8")); + StringBuilder sb = new StringBuilder(40); + for (byte x : bs) { + if ((x & 0xff) >> 4 == 0) { + sb.append("0").append(Integer.toHexString(x & 0xff)); + } else { + sb.append(Integer.toHexString(x & 0xff)); + } + } + return sb.toString(); + } catch (Exception e) { + //LOG.error("获取md5异常", e); + return "nceaform" + System.currentTimeMillis(); + } + } + + public static String removeAllHtmlTags(String str) { + return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : ""; + } + + public static String getRegexGroup(Pattern regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Matcher m = regex.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + log.error(regex + " parser error!"); + } + + return resultStr; + } + + public static String getStrByPattern(String str, String regex) { + Pattern pattern = Pattern.compile(regex); + Matcher m = pattern.matcher(str); + return m.find() ? m.group(0) : ""; + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..e768626 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,59 @@ +server: + port: 7080 +spring: + application: + name: handlerData + boot: + admin: + client: + health: + timeout: 10s + url: http://172.16.12.55:8001 + instance: + service-base-url: http://10.10.144.49:8080 + kafka: + bootstrap-servers: 172.26.28.30:9092 + producer: + retries: 3 + acks: all + batch-size: 4096 + buffer-memory: 102476800 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + redis: + host: 172.18.1.101 + port: 6379 + timeout: 10000 + database: 5 + jedis: + pool: + max-active: 8 # 连接池最大连接数(使用负值表示没有限制) + max-wait: 800 # 连接池最大阻塞等待时间(使用负值表示没有限制) + +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: always + health: + elasticsearch: + enabled: false + +logging: + file: + path: ./logs + level: + de.codecentric.boot.admin.client: DEBUG +zookeeper: + connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181 + publish-node: /analyze + +thread: + handler: 5 + send: 50 + sleep: 10 +send: + topic: analyze123213 diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..9307a82 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,39 @@ + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + + + + + true + + ${logging.level} + + + ${logging.file.path}/handler-data.log + + + ${logging.file.path}/handler-data.log.%d{yyyy-MM-dd} + 3 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + UTF-8 + + + + + + + + + + +