commit 354ac7fbc948837b9122e6c4afd3b6c1b4c8e74f
Author: 55007 <55007@maojian>
Date: Tue Jan 7 17:02:46 2025 +0800
网站数据采集应用管理
diff --git a/.classpath b/.classpath
new file mode 100644
index 0000000..1a0a8d2
--- /dev/null
+++ b/.classpath
@@ -0,0 +1,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..daef30b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+/target/
+/logs/
+/.idea/
+/crawltaskmanager.iml
\ No newline at end of file
diff --git a/.project b/.project
new file mode 100644
index 0000000..2f5cfe7
--- /dev/null
+++ b/.project
@@ -0,0 +1,23 @@
+
+
+ crawltaskmanager
+
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+ org.eclipse.m2e.core.maven2Builder
+
+
+
+
+
+ org.eclipse.jdt.core.javanature
+ org.eclipse.m2e.core.maven2Nature
+
+
diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs
new file mode 100644
index 0000000..839d647
--- /dev/null
+++ b/.settings/org.eclipse.core.resources.prefs
@@ -0,0 +1,5 @@
+eclipse.preferences.version=1
+encoding//src/main/java=UTF-8
+encoding//src/main/resources=UTF-8
+encoding//src/test/java=UTF-8
+encoding/=UTF-8
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..71df522
--- /dev/null
+++ b/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,9 @@
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.methodParameters=generate
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
+org.eclipse.jdt.core.compiler.compliance=1.8
+org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
+org.eclipse.jdt.core.compiler.release=disabled
+org.eclipse.jdt.core.compiler.source=1.8
diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs
new file mode 100644
index 0000000..f897a7f
--- /dev/null
+++ b/.settings/org.eclipse.m2e.core.prefs
@@ -0,0 +1,4 @@
+activeProfiles=
+eclipse.preferences.version=1
+resolveWorkspaceProjects=true
+version=1
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..c3a3db3
--- /dev/null
+++ b/README.md
@@ -0,0 +1 @@
+采集任务管理应用
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..ac0501f
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,244 @@
+
+
+
+ 4.0.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.4.RELEASE
+
+
+ com.bfd.task
+ crawltaskmanager
+ 0.0.1-SNAPSHOT
+
+ crawltaskmanager
+
+ http://www.example.com
+
+
+ UTF-8
+ 1.8
+ 1.8
+
+
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ de.codecentric
+ spring-boot-admin-starter-client
+ 2.2.4
+
+
+ com.google.code.gson
+ gson
+ 2.8.8
+
+
+ org.springframework.boot
+ spring-boot-test
+
+
+
+ org.springframework
+ spring-test
+ 5.0.10.RELEASE
+ test
+
+
+ commons-io
+ commons-io
+ 1.4
+
+
+ com.alibaba
+ fastjson
+ 2.0.17
+
+
+
+ com.mchange
+ c3p0
+ 0.9.5.5
+
+
+ mysql
+ mysql-connector-java
+ 8.0.29
+
+
+
+ com.squareup.okhttp3
+ okhttp
+ 4.9.3
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.3
+
+
+ commons-lang
+ commons-lang
+ 2.6
+
+
+
+ org.jetbrains.kotlin
+ kotlin-reflect
+ 1.6.21
+ runtime
+
+
+
+ org.jsoup
+ jsoup
+ 1.8.1
+
+
+
+ org.projectlombok
+ lombok
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+
+ cn.hutool
+ hutool-all
+ 5.8.5
+
+
+ junit
+ junit
+
+
+
+ p6spy
+ p6spy
+ 3.9.0
+
+
+
+ commons-collections
+ commons-collections
+ 3.2.2
+
+
+
+ org.redisson
+ redisson-spring-boot-starter
+ 3.13.6
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
+
+
+
+
+
+
+ maven-clean-plugin
+ 3.1.0
+
+
+
+ maven-resources-plugin
+ 3.0.2
+
+
+ maven-compiler-plugin
+ 3.8.0
+
+
+ maven-surefire-plugin
+ 2.22.1
+
+
+ maven-jar-plugin
+ 3.0.2
+
+
+ maven-install-plugin
+ 2.5.2
+
+
+ maven-deploy-plugin
+ 2.8.2
+
+
+
+ maven-site-plugin
+ 3.7.1
+
+
+ maven-project-info-reports-plugin
+ 3.0.0
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ com.bfd.task.Application
+ ZIP
+
+
+ ${project.groupId}
+ ${project.artifactId}
+
+
+
+
+
+
+ repackage
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.1.1
+
+
+ copy
+ package
+
+ copy-dependencies
+
+
+ jar
+ jar
+ runtime
+ ${project.build.directory}/libs
+
+
+
+
+
+
+
+
diff --git a/src/main/java/com/bfd/task/Application.java b/src/main/java/com/bfd/task/Application.java
new file mode 100644
index 0000000..4486759
--- /dev/null
+++ b/src/main/java/com/bfd/task/Application.java
@@ -0,0 +1,25 @@
+package com.bfd.task;
+
+
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+
+/**
+ * 主入口
+ *
+ * @author jian.mao
+ * @date 2023年7月4日
+ * @description
+ */
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+
+}
diff --git a/src/main/java/com/bfd/task/cache/ConfigCache.java b/src/main/java/com/bfd/task/cache/ConfigCache.java
new file mode 100644
index 0000000..b07b7f5
--- /dev/null
+++ b/src/main/java/com/bfd/task/cache/ConfigCache.java
@@ -0,0 +1,30 @@
+package com.bfd.task.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author jian.mao
+ * @date 2022年11月11日
+ * @description 静态变量类
+ */
+public class ConfigCache {
+
+ /**启动条件**/
+ public static boolean isStart = true;
+
+
+
+ /**
+ * token缓存,不操作30分钟
+ */
+ public static ConcurrentHashMap tokenCache = new ConcurrentHashMap(16);
+
+
+ /**
+ * 任务缓存
+ */
+ public static ConcurrentHashMap taskCache = new ConcurrentHashMap(16);
+}
diff --git a/src/main/java/com/bfd/task/controller/TaskManagerController.java b/src/main/java/com/bfd/task/controller/TaskManagerController.java
new file mode 100644
index 0000000..9a26491
--- /dev/null
+++ b/src/main/java/com/bfd/task/controller/TaskManagerController.java
@@ -0,0 +1,46 @@
+package com.bfd.task.controller;
+
+
+import javax.annotation.Resource;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import com.alibaba.fastjson.JSONObject;
+import com.bfd.task.entity.ResponsePo;
+import com.bfd.task.service.TaskManagerService;
+
+
+/**
+ * 任务管理接口
+ * @author jian.mao
+ * @date 2023年9月19日
+ * @description
+ */
+@Controller
+@RequestMapping("/task")
+@Slf4j
+public class TaskManagerController {
+
+ @Resource
+ TaskManagerService taskManagerService;
+ @PostMapping("/add")
+ @ResponseBody
+ public String addTask(@RequestBody String dataJson) {
+ log.info("请求参数:{}",dataJson);
+ ResponsePo responsePo = taskManagerService.addTask(dataJson);
+ return JSONObject.toJSONString(responsePo);
+ }
+
+ @RequestMapping(value = "/hello", method = RequestMethod.GET)
+ @ResponseBody
+ public String hello(String param, String token) {
+ return "123";
+ }
+}
diff --git a/src/main/java/com/bfd/task/entity/Constants.java b/src/main/java/com/bfd/task/entity/Constants.java
new file mode 100644
index 0000000..704626c
--- /dev/null
+++ b/src/main/java/com/bfd/task/entity/Constants.java
@@ -0,0 +1,159 @@
+package com.bfd.task.entity;
+
+
+/**
+ * 常量实体类
+ * @author jian.mao
+ * @date 2022年11月15日
+ * @description
+ */
+public class Constants {
+ /*************************蓝图常量key名称*********************************/
+ public final static String SCHEDULING = "scheduling";
+ public final static String TYPE = "type";
+ public final static String INTERVAL = "interval";
+ public final static String CREATED = "created";
+ public final static String LAST_EDIT = "last_edit";
+ public final static String BLUEPRINT_ID = "blueprint_id";
+ public final static String BLUEPRINTID = "blueprintId";
+ public final static String BLUEPRINT_NAME = "name";
+ public final static String SCENARIO = "scenario";
+ public final static String AUTOCOMMITTRIGGERLAST = "autoCommitTriggerLast";
+ public final static String FRESHVARIABLES = "freshVariables";
+ public final static String AUTOCOMMIT = "autoCommit";
+ public final static String MAXERRORS = "maxErrors";
+ public final static String DATALOSS = "dataloss";
+ public final static String POSITION = "position";
+ public final static String SCENES_ID = "scenes_id";
+ public final static String SCENESID = "scenesId";
+ public final static String MULTI_BRANCH = "multi_branch";
+
+ public final static String SINGLE = "single";
+ /**已重试次数**/
+ public final static String ERROR_TIME = "error_time";
+ public final static String PREVIOUS_RESULT = "previous_result";
+
+ /****数据id*****/
+ public final static String BUSINESSKEY = "businessKey";
+
+
+ /*************************metadata常量key名称*********************************/
+ public final static String OUTPUT = "output";
+ public final static String LABEL_COL = "label_col";
+ public final static String LABEL = "label";
+ public final static String INPUT = "input";
+ public final static String USER = "user";
+ public final static String ADMIN = "admin";
+ public final static String ADDRESS = "address";
+ public final static String DATASOURCE = "datasource";
+ public final static String INDEX = "index";
+
+ /*************************app常量key名称*********************************/
+ public final static String APPS = "apps";
+ public final static String TRANSFER_ID = "transfer_id";
+ public final static String MODULE = "module";
+ public final static String VERSION = "version";
+ public final static String METADATA = "metadata";
+ public final static String DATA = "data";
+ public final static String APP_NAME = "name";
+ public final static String DESCRIBE = "describe";
+ public final static String NEXT_APP_ID = "next_app_id";
+ public final static String EDGE_ID = "edge_id";
+ public final static String START_ID = "start_id";
+ public final static String END_ID = "end_id";
+
+ public final static String WAIT_CONDITION = "wait_condition";
+ public final static String START_TAG = "start_tag";
+
+ /*************************module类型*********************************/
+ public final static String FILE = "file";
+ public final static String OCR = "OCR";
+ public final static String FILTER = "Filter";
+ public final static String CHATGPT = "ChatGPT";
+ public final static String MYSQL = "mysql";
+
+ /*************************other类型*********************************/
+ public final static String UNDERLINE = "_";
+ public final static String RESULT_TOPIC = null;
+ public static final String EMPTY = "";
+ public static final String HTTP = "http";
+ public static final String REQUEST_ERROR_MESSAGE = "Download failed error is";
+ public static final String REQUEST_RESULT = "result";
+ public static final String REQUEST_RESULT_RESULTS = "results";
+ public static final String MAP_TYPE = "Map";
+ public static final String LIST_TYPE = "List";
+ public static final String STRING_TYPE = "String";
+ public static final String DOCUMENT_TYPE = "doc";
+ public static final String FILTER_ZH = "过滤器";
+
+ public static final String JSON_SELE_SYMBOL = "$.";
+ public static final String LEFT_BRACKETS = "[";
+ public static final String RIGTH_BRACKETS = "]";
+ public static final String TASKTYPE = "taskType";
+ public static final Integer USER_TYPE = 1;
+ public static final Integer KEYWORD_TYPE = 0;
+ public static final Integer DETAIL_TYPE = 2;
+ public static final String CID = "cid";
+ public static final String SITETYPE = "siteType";
+ public static final Integer DEFULT_SUBJECTID = 304864;
+ public static final Integer DEFULT_CRAWLCYCLICITYTIME = 1440;
+ public static final String CRAWLENDTIME = "crawlEndTime";
+ public static final String CRAWLSTARTTIME = "crawlStartTime";
+ public static final String CRAWLPAGETYPES = "crawlPageTypes";
+ public static final String APPID = "113ic";
+ public static final String APP_ID = "appId";
+ public final static String ID = "id";
+ public static final Integer DEFULT_CRAWLPERIODHOUR = 24;
+ public static final String CREATEUSERID_ANALYZE = "662015832180933762";
+ public static final String CREATEUSERID = "createUserId";
+ public static final String CRAWL_ADD_URL = "https://caiji.percent.cn/api/crawl/remote/task/save";
+ public static final String CRAWLKEYWORD = "crawlKeyword";
+ public static final String ATTACHTAG = "attachTag";
+ public static final String ATTACHTAG_VALUE = "analyze";
+ public static final String KEYWORD = "keyword";
+ public static final String SITEID = "siteId";
+ public static final String RESULTS = "results";
+ public static final String RESULT = "result";
+ public static final String CRAWLDATAFLAG = "crawlDataFlag";
+ public static final String CRAWLDATAFLAG_PREFIX = "\"crawlDataFlag\":\"keyword:";
+ public static final String TID = "tid";
+ public static final Long TIME_OUT = 1800000L;
+ public static final String ATTR = "attr";
+ public static final String HASVIDEO = "hasVideo";
+ public static final String CRAWL_END_MARK = "crawl_end_mark";
+ public static final String CRAWL_END_MESSAGE = "crawl_end_message";
+ public static final String CRAWL_END_MESSAGE_VALUE = "数据采集完成";
+ public static final String SUBJECTID = "subjectId";
+ public static final String SUBJECT_ID = "subject_id";
+ public static final String KAFKA_TOPIC = "kafka_topic";
+ public static final String KAFKA_ADDR = "kafka_addr";
+ public static final String TASK_ID = "task_id";
+ public static final String TASKID = "taskId";
+ public static final String CODE = "code";
+ public static final int SUCCESS_CODE = 200;
+ public static final String WEB_URL_SUFFIX = "/api/cda/caiji/status";
+ public static final String STATUS = "status";
+
+ /*****关键词任务前缀crawldataflag*******/
+ public static final String KEYWORD_PREFIX = "keyword:";
+ /*****用户任务前缀crawldataflag*******/
+ public static final String ACCOUNT_PREFIX = "account:";
+ /*****url任务前缀crawldataflag*******/
+ public static final String URL_PREFIX = "url:";
+ /************************redis*************************************/
+ public static final String LOCK_KEY = "myLock";
+ public static final long LOCK_EXPIRE_TIME = 300000;
+ public static final String APP_CODE = "app_code";
+ public static final String APPCODE = "appCode";
+ public static final String ISLAST = "isLast";
+
+ public static final String REDISKEY_MIDDLE = "#####";
+
+
+
+
+ /****************************es字段key映射*****************/
+ public static final String VIDEOPATH = "videoPath";
+ public static final String SOURCE_DATA_ID = "source_data_id";
+ public static final String MESSAGE = "message";
+}
diff --git a/src/main/java/com/bfd/task/entity/ResponsePo.java b/src/main/java/com/bfd/task/entity/ResponsePo.java
new file mode 100644
index 0000000..b6e574d
--- /dev/null
+++ b/src/main/java/com/bfd/task/entity/ResponsePo.java
@@ -0,0 +1,62 @@
+package com.bfd.task.entity;
+
+
+
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import com.bfd.task.enums.ResponseCode;
+
+/**
+ * @author:jinming
+ * @className:ResponsePo
+ * @version:1.0
+ * @description:
+ * @Date:2023/4/3 17:23
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ResponsePo {
+ /**
+ * 响应码
+ */
+ private int code;
+
+ /**
+ * 正常放 返回数据 的JSON串
+ */
+ private Object data;
+
+ /**
+ * 提示消息
+ */
+ private String message;
+
+ public static ResponsePo success() {
+ return setStatus(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getMessage());
+ }
+
+ public static ResponsePo error() {
+ return setStatus(ResponseCode.FAILURE.getCode(), ResponseCode.FAILURE.getMessage());
+ }
+
+ public static ResponsePo setStatus(int code, String message) {
+ ResponsePo resultBean = new ResponsePo();
+ resultBean.code = code;
+ resultBean.message = message;
+ return resultBean;
+ }
+ public ResponsePo(int code, String message) {
+ this.code = code;
+ this.message = message;
+ this.data = data;
+ }
+ public ResponsePo(ResponseCode responseCode){
+ this.code = responseCode.getCode();
+ this.message = responseCode.getMessage();
+ this.data = data;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/bfd/task/enums/ResponseCode.java b/src/main/java/com/bfd/task/enums/ResponseCode.java
new file mode 100644
index 0000000..df3d7a4
--- /dev/null
+++ b/src/main/java/com/bfd/task/enums/ResponseCode.java
@@ -0,0 +1,33 @@
+package com.bfd.task.enums;
+
+/**
+ * @author:jinming
+ * @className:ResponseCodeEnum
+ * @version:1.0
+ * @description:响应结果码枚举类
+ * @Date:2023/2/28 11:40
+ */
+public enum ResponseCode {
+ //返回结果码枚举类
+ SUCCESS(200, "操作成功"),
+ FAILURE(400, "参数错误"),
+ FAILCIRCULATION(601, "数据消费错误"),
+ FAILADDTASK(1001,"任务下发失败"),
+ INTERNAL_SERVER_ERROR(500, "服务器内部错误"),
+ TYPE_NOT_SUPPORT(601,"文件类型不支持");
+ private int code;
+ private String message;
+
+ ResponseCode(int code, String message) {
+ this.code = code;
+ this.message = message;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/bfd/task/handler/MainHander.java b/src/main/java/com/bfd/task/handler/MainHander.java
new file mode 100644
index 0000000..4fe52e6
--- /dev/null
+++ b/src/main/java/com/bfd/task/handler/MainHander.java
@@ -0,0 +1,116 @@
+package com.bfd.task.handler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.io.FileUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.fastjson.JSONObject;
+import com.bfd.task.cache.ConfigCache;
+import com.bfd.task.process.CacheMonitorProcess;
+import com.bfd.task.process.DataConsumptionProcess;
+import com.bfd.task.process.SendResultProcess;
+import com.bfd.task.utils.FileUtil;
+
+
+/**
+ * 执行入口
+ * @author jian.mao
+ * @date 2023年9月15日
+ * @description
+ */
+@Component
+@Order(value = 1)
+@Slf4j
+public class MainHander implements ApplicationRunner{
+
+ @Value("${task.task-queue-path}")
+ private String taskPath;
+ @Value("${task.token-queue-path}")
+ private String tokenPath;
+ @Autowired
+ SendResultProcess sendResultProcess;
+ @Autowired
+ CacheMonitorProcess cacheMonitorProcess;
+ @Autowired
+ DataConsumptionProcess dataConsumptionProcess;
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ //启动加载缓存任务
+ readTask(taskPath,ConfigCache.taskCache);
+ readTask(tokenPath,ConfigCache.tokenCache);
+ //启动任务流转线程
+ log.info("sendResultProcess is start");
+ new Thread(sendResultProcess).start();
+ //开启数据消费线程
+ log.info("dataConsumptionProcess is start");
+ new Thread(dataConsumptionProcess).start();
+ //开启缓存监控线程
+ log.info("cacheMonitorProcess is start");
+ new Thread(cacheMonitorProcess).start();
+ //停止处理
+ waitDown();
+
+ }
+
+
+
+ @SuppressWarnings("unchecked")
+ public static void readTask(String path,Map map){
+ File file = new File(path);
+ if(file.exists()){
+ List tasks = null;
+ try {
+ tasks = FileUtils.readLines(file,"UTF-8");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ for (String task : tasks) {
+ Map loadCacheMap = JSONObject.parseObject(task);
+ for (Entry entry : loadCacheMap.entrySet()) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ }
+ file.delete();
+ }
+ }
+ /**
+ * 结束触发钩子
+ */
+ public void waitDown() {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ // 停止线程
+ ConfigCache.isStart = false;
+ log.info("stop-------");
+ writeTsskToFile();
+ }
+ });
+ }
+
+
+ /**
+ * 任务持久化到硬盘
+ */
+ public void writeTsskToFile(){
+ if(!ConfigCache.taskCache.isEmpty()){
+ FileUtil.writeFile(taskPath, JSONObject.toJSONString(ConfigCache.taskCache));
+ }
+ if(!ConfigCache.tokenCache.isEmpty()){
+ FileUtil.writeFile(tokenPath, JSONObject.toJSONString(ConfigCache.tokenCache));
+ }
+ log.info("cache write is file end");
+ }
+}
diff --git a/src/main/java/com/bfd/task/model/AppsEntity.java b/src/main/java/com/bfd/task/model/AppsEntity.java
new file mode 100644
index 0000000..578bc5e
--- /dev/null
+++ b/src/main/java/com/bfd/task/model/AppsEntity.java
@@ -0,0 +1,54 @@
+package com.bfd.task.model;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+import lombok.Data;
+
+/**
+ * app节点实体类
+ *
+ * @author jian.mao
+ * @date 2023年7月5日
+ * @description
+ */
+@Data
+public class AppsEntity {
+ /*** 蓝图id ***/
+ private Integer blueprintId;
+ /*** 流转id ***/
+ private Integer transferId;
+ /*** appid ***/
+ private Integer appId;
+ /*** 模型id ***/
+ private Integer moduleId;
+ /*** 节点操作名称 ***/
+ private String name;
+ /*** 节点描述 ***/
+ private String describe;
+ /*** 等待节点 ***/
+ private String waitCondition;
+ /*** 是否初始节点 ***/
+ private Integer startTag;
+ /*** 坐标 ***/
+ private String position;
+ /*** 版本 ***/
+ private Integer version;
+ /*** 输出 ***/
+ private String output;
+ /*** 输入 ***/
+ private String input;
+ /*** 用户输入 ***/
+ private String user;
+ /*** 管理员输入 ***/
+ private String admin;
+ /*** 模型服务地址信息 ***/
+ private String address;
+ /*** 数据结果 ***/
+ private String data;
+ /*** 创建时间 ***/
+ private LocalDateTime created;
+ /*** 最后修改时间 ***/
+ private Timestamp lastEdit;
+
+}
diff --git a/src/main/java/com/bfd/task/model/BlueprintEntity.java b/src/main/java/com/bfd/task/model/BlueprintEntity.java
new file mode 100644
index 0000000..1246f4b
--- /dev/null
+++ b/src/main/java/com/bfd/task/model/BlueprintEntity.java
@@ -0,0 +1,44 @@
+package com.bfd.task.model;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+import lombok.Data;
+
+/**
+ * 蓝图实体类
+ * @author jian.mao
+ * @date 2023年7月5日
+ * @description
+ */
+@Data
+public class BlueprintEntity {
+ /***蓝图id***/
+ private Integer blueprintId;
+ /***场景id***/
+ private Integer scenesId;
+ /***蓝图名称***/
+ private String name;
+ /***调度类型***/
+ private String schedulingType;
+ /***周期***/
+ private Integer schedulingInterval;
+ /***蓝图是否最后提交***/
+ private Integer autoCommitTriggerLast;
+ /***蓝图状态,成功\失败***/
+ private Integer dataloss;
+ /***重试次数***/
+ private Integer maxErrors;
+ /***自动提交***/
+ private Integer autoCommit;
+ /*** ***/
+ private Integer freshVariables;
+ /***创建时间***/
+ private LocalDateTime created;
+ /***最后修改时间***/
+ private Timestamp lastEdit;
+ /*** ***/
+ private String user;
+ /***单、多分支标识***/
+ public Integer multiBranch;
+}
diff --git a/src/main/java/com/bfd/task/model/ModulesEntity.java b/src/main/java/com/bfd/task/model/ModulesEntity.java
new file mode 100644
index 0000000..71d0f90
--- /dev/null
+++ b/src/main/java/com/bfd/task/model/ModulesEntity.java
@@ -0,0 +1,28 @@
+package com.bfd.task.model;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+import lombok.Data;
+
+/**
+ * 模块实体类
+ *
+ * @author jian.mao
+ * @date 2023年7月5日
+ * @description
+ */
+@Data
+public class ModulesEntity {
+ /*** 模型id ***/
+ private Integer moduleId;
+ /*** 模型名称 ***/
+ private String module;
+ /*** 模型logo地址 ***/
+ private String logo;
+ /*** 创建时间 ***/
+ private LocalDateTime created;
+ /*** 最后修改时间 ***/
+ private Timestamp lastEdit;
+
+}
diff --git a/src/main/java/com/bfd/task/model/RelationsEntity.java b/src/main/java/com/bfd/task/model/RelationsEntity.java
new file mode 100644
index 0000000..a4e6d93
--- /dev/null
+++ b/src/main/java/com/bfd/task/model/RelationsEntity.java
@@ -0,0 +1,32 @@
+package com.bfd.task.model;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+import lombok.Data;
+
+/**
+ * 关系实体类
+ *
+ * @author jian.mao
+ * @date 2023年7月5日
+ * @description
+ */
+@Data
+public class RelationsEntity {
+ /*** 关系id ***/
+ private Integer relationId;
+ /*** 蓝图id ***/
+ private Integer blueprintId;
+ /*** 开始节点id ***/
+ private Integer startId;
+ /*** 结束节点id ***/
+ private Integer endId;
+ /*** 创建时间 ***/
+ private LocalDateTime created;
+ /*** 最后修改时间 ***/
+ private Timestamp lastEdit;
+ /******/
+ private String user;
+
+}
diff --git a/src/main/java/com/bfd/task/model/TaskParam.java b/src/main/java/com/bfd/task/model/TaskParam.java
new file mode 100644
index 0000000..42c419f
--- /dev/null
+++ b/src/main/java/com/bfd/task/model/TaskParam.java
@@ -0,0 +1,61 @@
+package com.bfd.task.model;
+
+import java.util.List;
+
+import lombok.Data;
+
+/**
+ * 采集任务添加 参数
+ * @author jian.mao
+ * @date 2023年9月19日
+ * @description
+ */
+@Data
+public class TaskParam {
+
+
+ /**唯一标识**/
+ private String id ;
+ /**网站siteid**/
+ private Integer siteId ;
+ /**站点类型**/
+ private Integer siteType ;
+ /**采集频率**/
+ private Integer crawlCyclicityTime ;
+ /**采集结束时间**/
+ private Long crawlEndTime ;
+ /**采集模式**/
+ private Integer crawlMode ;
+ /**页面类型**/
+ private List crawlPageTypes ;
+ /**增量时间范围**/
+ private Integer crawlPeriodHour ;
+ /**采集开始时间**/
+ private Long crawlStartTime ;
+ /**创建者id**/
+ private String createUserId ;
+ /**翻页上限**/
+ private Integer maxPageNum ;
+ /**批量标识**/
+ private Integer isBatch ;
+ /**专题id**/
+ private Integer subjectId ;
+ /**任务类型**/
+ private Integer taskType ;
+ /**关键词**/
+ private String crawlKeyword ;
+ /**标签**/
+ private String attachTag;
+ /**网站cid**/
+ private String cid ;
+ /**时间戳**/
+ private Long time ;
+ /**appid**/
+ private String appId ;
+ /**签名**/
+ private String sign ;
+ /**场景id**/
+ private Integer scenesId;
+ /**项目id**/
+ private Integer blueprintId;
+}
diff --git a/src/main/java/com/bfd/task/model/WebSite.java b/src/main/java/com/bfd/task/model/WebSite.java
new file mode 100644
index 0000000..c78d8bb
--- /dev/null
+++ b/src/main/java/com/bfd/task/model/WebSite.java
@@ -0,0 +1,19 @@
+package com.bfd.task.model;
+
+import lombok.Data;
+
+
+/**
+ * 网站表
+ * @author jian.mao
+ * @date 2023年9月19日
+ * @description
+ */
+@Data
+public class WebSite {
+
+ private Integer id;
+ private Integer siteId;
+ private String cid;
+ private Integer status;
+}
diff --git a/src/main/java/com/bfd/task/process/CacheMonitorProcess.java b/src/main/java/com/bfd/task/process/CacheMonitorProcess.java
new file mode 100644
index 0000000..4a839df
--- /dev/null
+++ b/src/main/java/com/bfd/task/process/CacheMonitorProcess.java
@@ -0,0 +1,81 @@
+package com.bfd.task.process;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.fastjson.JSONObject;
+import com.bfd.task.cache.ConfigCache;
+import com.bfd.task.entity.Constants;
+import com.bfd.task.utils.DownLoadUtil;
+import com.bfd.task.utils.QueueUtil;
+
+/**
+ * 监控缓存变量动态
+ * @author jian.mao
+ * @date 2024年3月14日
+ * @description
+ */
+@Slf4j
+@Component
+public class CacheMonitorProcess implements Runnable{
+
+ @Value("${manageweb.host}")
+ private String webUrlProfix;
+ @Override
+ public void run() {
+ // TODO Auto-generated method stub
+ while(ConfigCache.isStart){
+ try {
+ for (Entry entry:ConfigCache.tokenCache.entrySet()) {
+ String key = entry.getKey();
+ long value = (long)entry.getValue();
+ long currentTimeMillis = System.currentTimeMillis();
+ if(currentTimeMillis >= value){
+ if(!ConfigCache.taskCache.containsKey(key)){
+ log.error("监控任务中不存在:{}",key);
+ continue;
+ }
+ Map task = (Map) ConfigCache.taskCache.get(key);
+ log.info("释放任务:{}",JSONObject.toJSONString(task));
+ //发送结束标识
+ Map endResults = new HashMap(16);
+ // 结果集组装
+ Map result = new HashMap(16);
+ endResults.put(Constants.TASKID, key);
+ endResults.put(Constants.CRAWL_END_MARK, "ok");
+ endResults.put(Constants.CRAWL_END_MESSAGE, Constants.CRAWL_END_MESSAGE_VALUE);
+ endResults.put(Constants.ISLAST,1);
+ result.put(Constants.RESULTS, JSONObject.toJSONString(endResults));
+ result.put(Constants.ISLAST, true);
+ task.put(Constants.RESULT, result);
+ //发送采集结束标识,先注掉,后面在考虑
+ QueueUtil.sendQueue.put(JSONObject.toJSONString(task));
+ //反馈给前端系统
+ Map param = new HashMap(16);
+ param.put(Constants.APPCODE, task.get(Constants.APP_CODE));
+ param.put(Constants.STATUS, 3);
+ log.info("结束触发参数:{}",JSONObject.toJSONString(param));
+ String html = DownLoadUtil.doPost(webUrlProfix+Constants.WEB_URL_SUFFIX, JSONObject.toJSONString(param));
+ log.info("通知管理系统采集结束,管理系统返回结果:{}",html);
+ //清空缓存
+ ConfigCache.tokenCache.remove(key);
+ ConfigCache.taskCache.remove(key);
+ log.info("taskId:{},任务已过期。",key);
+ }
+ }
+ //10秒监控
+ Thread.sleep(10*1000);
+ } catch (Exception e) {
+ log.error("未知异常:",e);
+ }
+
+ }
+ }
+
+}
diff --git a/src/main/java/com/bfd/task/process/DataConsumptionProcess.java b/src/main/java/com/bfd/task/process/DataConsumptionProcess.java
new file mode 100644
index 0000000..e83e555
--- /dev/null
+++ b/src/main/java/com/bfd/task/process/DataConsumptionProcess.java
@@ -0,0 +1,145 @@
+package com.bfd.task.process;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.fastjson.JSONObject;
+import com.bfd.task.cache.ConfigCache;
+import com.bfd.task.entity.Constants;
+import com.bfd.task.utils.QueueUtil;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author jian.mao
+ * @date 2023年9月21日
+ * @description
+ */
+@Slf4j
+@Component
+public class DataConsumptionProcess implements Runnable {
+
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String bootstrapServers;
+ @Value("${spring.kafka.consumer.group-id}")
+ private String groupId;
+ @Value("${customize-kafka.consumer.topic}")
+ private String topic;
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ // 创建 Kafka 消费者配置
+ Map consumerProps = new HashMap(16);
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServers);
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ //跟读
+ consumerProps.put("auto.offset.reset", "latest");
+ consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ consumerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ Consumer consumer = new KafkaConsumer<>(consumerProps);
+ try {
+ // 订阅主题
+ consumer.subscribe(Collections.singletonList(topic));
+ // 消费消息
+ while (true) {
+ // 没超时的话正常消费数据
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord record : records) {
+ Map resultData = new HashMap(32);
+ try {
+ // 处理消息,这里可以根据需要进行业务处理
+ Map resultEs = JSONObject.parseObject(record.value());
+ log.info("Received message: "+ record.value());
+ if(!resultEs.containsKey(Constants.TASKID)){
+ log.warn("数据体缺少taskId");
+ continue;
+ }
+ String taskId = resultEs.get(Constants.TASKID).toString();
+ if(!ConfigCache.taskCache.containsKey(taskId)){
+ log.warn("不属于有知任务产出的数据,taskId:{}",taskId);
+ continue;
+ }
+ Map task = (Map) ConfigCache.taskCache.get(taskId);
+ String token = (String) task.get(Constants.BUSINESSKEY);
+ Map input = (Map) task.get(Constants.INPUT);
+ Integer hasVideo = (Integer) input.get(Constants.HASVIDEO);
+ if(resultEs.get(Constants.HASVIDEO).equals(hasVideo)){
+ Map crawlResults = JSONObject.parseObject(record.value());
+ //结果加工 例如videopath[]转换成String
+ bulidResult(crawlResults);
+ // 结果集组装
+ Map result = new HashMap(16);
+ //结果内容
+ Map data = new HashMap(16);
+ //获取输出字段
+ Map output = (Map) task.get(Constants.OUTPUT);
+ for (String key: output.keySet()) {
+ if (crawlResults.containsKey(key)){
+ data.put(key,crawlResults.get(key));
+ }
+ }
+ result.put(Constants.RESULTS, JSONObject.toJSONString(data));
+ for (String key : task.keySet()) {
+ resultData.put(key, task.get(key));
+ }
+ result.put(Constants.STATUS, 1);
+ result.put(Constants.MESSAGE, "成功");
+ resultData.put(Constants.RESULT, result);
+ QueueUtil.sendQueue.put(JSONObject.toJSONString(resultData));
+ //taskId赋值
+ if(resultEs.containsKey(Constants.TASKID)){
+ if(taskId == null){
+ taskId = resultEs.get(Constants.TASKID).toString();
+ }
+ }
+ }else{
+ log.info("不符合需求数据----");
+ }
+ } catch (Exception e) {
+ // TODO: handle exception
+ log.error("数据格式异常:{}",record.value());
+ //结果集
+ Map result = new HashMap(16);
+ //遍历入库返回结果,拼接响应内容
+ result.put(Constants.RESULTS, e.getMessage());
+ result.put(Constants.MESSAGE, "异常");
+ result.put(Constants.STATUS, 2);
+ resultData.put(Constants.RESULT, result);
+ //发送kafka
+ QueueUtil.sendQueue.put(JSONObject.toJSONString(resultData));
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("kafka消费异常\n", e);
+ consumer.close();
+ }
+ }
+
+ /**
+ * 结果加工
+ * @param result
+ */
+ private void bulidResult(Map result){
+ //视频gofast地址加工
+ List videoPath = (List) result.get(Constants.VIDEOPATH);
+ if(videoPath != null && videoPath.size() > 0){
+ String videoUrl = videoPath.get(0);
+ result.put(Constants.VIDEOPATH, videoUrl);
+ }
+ }
+}
diff --git a/src/main/java/com/bfd/task/process/KafkaConsumerProcess.java b/src/main/java/com/bfd/task/process/KafkaConsumerProcess.java
new file mode 100644
index 0000000..5054f37
--- /dev/null
+++ b/src/main/java/com/bfd/task/process/KafkaConsumerProcess.java
@@ -0,0 +1,228 @@
+package com.bfd.task.process;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+import com.alibaba.fastjson.JSONObject;
+import com.bfd.task.cache.ConfigCache;
+import com.bfd.task.entity.Constants;
+import com.bfd.task.utils.DateUtil;
+import com.bfd.task.utils.DownLoadUtil;
+import com.bfd.task.utils.QueueUtil;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author jian.mao
+ * @date 2023年9月21日
+ * @description
+ */
+@Slf4j
+public class KafkaConsumerProcess implements Runnable {
+
+ private String bootstrapServers;
+ private String groupId;
+ private String topic;
+ private Map task;
+ private String webUrlProfix;
+
+
+
+
+ public String getWebUrlProfix() {
+ return webUrlProfix;
+ }
+
+ public void setWebUrlProfix(String webUrlProfix) {
+ this.webUrlProfix = webUrlProfix;
+ }
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public Map getTask() {
+ return task;
+ }
+
+ public void setTask(Map task) {
+ this.task = task;
+ }
+
+
+
+ public KafkaConsumerProcess(String bootstrapServers, String groupId,
+ String topic, Map task, String webUrlProfix) {
+ this.bootstrapServers = bootstrapServers;
+ this.groupId = groupId;
+ this.topic = topic;
+ this.task = task;
+ this.webUrlProfix = webUrlProfix;
+ }
+
+ public KafkaConsumerProcess() {
+
+ }
+ @Override
+ public void run() {
+ // 创建 Kafka 消费者配置
+ Map consumerProps = new HashMap(16);
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServers);
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ //跟读
+ consumerProps.put("auto.offset.reset", "latest");
+ consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ consumerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ Consumer consumer = new KafkaConsumer<>(consumerProps);
+ String taskId = null;
+ Map endResults = null;
+ try {
+ String token = (String) task.get(Constants.BUSINESSKEY);
+ Map input = (Map) task.get(Constants.INPUT);
+ Integer hasVideo = (Integer) input.get(Constants.HASVIDEO);
+ log.info("任务id;{},消费地址:{},消费主题:{},消费组:{}",token,bootstrapServers,topic,groupId);
+ // 订阅主题
+ consumer.subscribe(Collections.singletonList(topic));
+ // 消费消息
+ while (true) {
+ // 判断是否过期
+ Long time = (long)ConfigCache.tokenCache.get(token);
+ if(time == null){
+ time = System.currentTimeMillis();
+ ConfigCache.tokenCache.put(token, time);
+ }
+ // 没超时的话正常消费数据
+ if (System.currentTimeMillis() - time < Constants.TIME_OUT) {
+ log.info("此任务正常消费:{},消费组id:{},消费时间:{},当前时间:{}",token,groupId,DateUtil.getDateTime(time),DateUtil.getDateTime(System.currentTimeMillis()));
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord record : records) {
+ Map resultData = new HashMap(32);
+ for (String key : task.keySet()) {
+ resultData.put(key, task.get(key));
+ }
+ // 处理消息,这里可以根据需要进行业务处理
+ Map resultEs = JSONObject.parseObject(record.value());
+ log.info("Received message: "+ record.value());
+ if(((String)resultEs.get(Constants.ATTR)).contains(token) && resultEs.get(Constants.HASVIDEO).equals(hasVideo)){
+ endResults = resultEs;
+ Map crawlResults = JSONObject.parseObject(record.value());
+ //结果加工 例如videopath[]转换成String
+ bulidResult(crawlResults);
+ // 结果集组装
+ Map result = new HashMap(16);
+ //结果内容
+ Map data = new HashMap(16);
+ //获取输出字段
+ Map output = (Map) task.get(Constants.OUTPUT);
+ for (String key: output.keySet()) {
+ if (crawlResults.containsKey(key)){
+ data.put(key,crawlResults.get(key));
+ }
+ }
+ result.put(Constants.RESULTS, JSONObject.toJSONString(data));
+ resultData.put(Constants.RESULT, result);
+ QueueUtil.sendQueue.put(JSONObject.toJSONString(resultData));
+ ConfigCache.tokenCache.put(token,System.currentTimeMillis());
+ //taskId赋值
+ if(resultEs.containsKey(Constants.TASKID)){
+ if(taskId == null){
+ taskId = resultEs.get(Constants.TASKID).toString();
+ }
+ }
+ }else{
+ log.info("不符合需求数据----");
+ }
+ }
+ }else{
+ log.info("kafka消费者会话过期,已进行销毁----");
+ ConfigCache.tokenCache.remove(token);
+ //发送结束标识
+ Map resultData = new HashMap(32);
+ for (String key : task.keySet()) {
+ resultData.put(key, task.get(key));
+ }
+ // 结果集组装
+ Map result = new HashMap(16);
+ if(taskId == null){
+ log.info("此任务:{},没有采集到数据",token);
+ }else{
+ endResults = new HashMap(16);
+ endResults.put(Constants.TASKID, taskId);
+ endResults.put(Constants.CRAWL_END_MARK, "ok");
+ endResults.put(Constants.CRAWL_END_MESSAGE, Constants.CRAWL_END_MESSAGE_VALUE);
+ result.put(Constants.RESULTS, JSONObject.toJSONString(endResults));
+ result.put(Constants.ISLAST, true);
+ resultData.put(Constants.RESULT, result);
+ //发送采集结束标识,先注掉,后面在考虑
+ QueueUtil.sendQueue.put(JSONObject.toJSONString(resultData));
+ }
+ consumer.close();
+ //反馈给前端系统
+ Map param = new HashMap(16);
+// param.put(Constants.BUSINESSKEY, resultData.get(Constants.BUSINESSKEY));
+// param.put(Constants.APP_ID, resultData.get(Constants.APP_ID));
+ param.put(Constants.APPCODE, resultData.get(Constants.APP_CODE));
+// param.put(Constants.SCENESID, resultData.get(Constants.SCENES_ID));
+ param.put(Constants.STATUS, 3);
+ log.info("结束触发参数:{}",JSONObject.toJSONString(param));
+ String html = DownLoadUtil.doPost(webUrlProfix+Constants.WEB_URL_SUFFIX, JSONObject.toJSONString(param));
+ log.info("通知管理系统采集结束,管理系统返回结果:{}",html);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ log.error("kafka消费异常\n", e);
+ consumer.close();
+ }
+ }
+
+ /**
+ * 结果加工
+ * @param result
+ */
+ private void bulidResult(Map result){
+ //视频gofast地址加工
+ List videoPath = (List) result.get(Constants.VIDEOPATH);
+ if(videoPath != null && videoPath.size() > 0){
+ String videoUrl = videoPath.get(0);
+ result.put(Constants.VIDEOPATH, videoUrl);
+ }
+ }
+}
diff --git a/src/main/java/com/bfd/task/process/SendResultProcess.java b/src/main/java/com/bfd/task/process/SendResultProcess.java
new file mode 100644
index 0000000..2d193d7
--- /dev/null
+++ b/src/main/java/com/bfd/task/process/SendResultProcess.java
@@ -0,0 +1,41 @@
+package com.bfd.task.process;
+
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import com.bfd.task.utils.QueueUtil;
+import com.bfd.task.utils.SpringBootKafka;
+
+/**
+ * @author jian.mao
+ * @date 2023年9月21日
+ * @description
+ */
+@Component
+@Slf4j
+public class SendResultProcess implements Runnable{
+ @Autowired
+ private SpringBootKafka springBootKafka;
+ @Value("${customize-kafka.producer.topic}")
+ private String topic;
+ @Override
+ public void run() {
+ // TODO Auto-generated method stub
+ while (true) {
+ String task;
+ try {
+ task = QueueUtil.sendQueue.take();
+ //写入kafka
+ springBootKafka.send(topic,task);
+ log.info("数据流转至下游-------");
+ } catch (InterruptedException e) {
+ log.error("获取发送数据异常",e);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/bfd/task/service/TaskManagerService.java b/src/main/java/com/bfd/task/service/TaskManagerService.java
new file mode 100644
index 0000000..aaf60d7
--- /dev/null
+++ b/src/main/java/com/bfd/task/service/TaskManagerService.java
@@ -0,0 +1,19 @@
+package com.bfd.task.service;
+
+import com.bfd.task.entity.ResponsePo;
+
+/**
+ * 任务管理逻辑层接口
+ * @author jian.mao
+ * @date 2023年9月19日
+ * @description
+ */
+public interface TaskManagerService {
+
+ /**
+ * 任务添加接口
+ * @param param
+ * @return
+ */
+ public ResponsePo addTask(String param);
+}
diff --git a/src/main/java/com/bfd/task/service/impl/TaskManagerServiceImpl.java b/src/main/java/com/bfd/task/service/impl/TaskManagerServiceImpl.java
new file mode 100644
index 0000000..55bcc71
--- /dev/null
+++ b/src/main/java/com/bfd/task/service/impl/TaskManagerServiceImpl.java
@@ -0,0 +1,284 @@
+package com.bfd.task.service.impl;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Service;
+
+import com.alibaba.fastjson.JSONObject;
+import com.bfd.task.cache.ConfigCache;
+import com.bfd.task.entity.Constants;
+import com.bfd.task.entity.ResponsePo;
+import com.bfd.task.enums.ResponseCode;
+import com.bfd.task.model.TaskParam;
+import com.bfd.task.process.KafkaConsumerProcess;
+import com.bfd.task.service.TaskManagerService;
+import com.bfd.task.utils.DownLoadUtil;
+import com.bfd.task.utils.OtherUtils;
+
+/**
+ * 逻辑层实现类
+ * @author jian.mao
+ * @date 2023年9月19日
+ * @description
+ */
+@Service
+@Slf4j
+public class TaskManagerServiceImpl implements TaskManagerService {
+
+
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String bootstrapServers;
+ @Value("${spring.kafka.consumer.group-id}")
+ private String groupId;
+ @Value("${customize-kafka.consumer.topic}")
+ private String topic;
+ @Value("${manageweb.host}")
+ private String webUrlProfix;
+ @Autowired
+ private StringRedisTemplate stringRedisTemplate;
+
+ @Override
+ public ResponsePo addTask(String param) {
+ // TODO Auto-generated method stub
+ ResponsePo responsePo = ResponsePo.success();
+ Map paramMap = null;
+ //读取队列
+ try {
+ paramMap = JSONObject.parseObject(param);
+ //任务下发
+ addTask(responsePo,paramMap);
+ } catch (Exception e) {
+ log.error("请求格式发生异常\n",e);
+ responsePo.setCode(ResponseCode.FAILURE.getCode());
+ responsePo.setMessage(ResponseCode.FAILURE.getMessage());
+ }
+ if(responsePo.getCode() != Constants.SUCCESS_CODE){
+ return responsePo;
+ }
+
+ /*try {
+ //流程流转
+ sendResult(paramMap);
+ } catch (Exception e) {
+ // TODO: handle exception
+ log.error("数据消费启动异常\n",e);
+ responsePo.setCode(ResponseCode.FAILCIRCULATION.getCode());
+ responsePo.setMessage(ResponseCode.FAILCIRCULATION.getMessage());
+ }*/
+
+ return responsePo;
+ }
+
+ /**
+ * 查询流程流转
+ * @param paramMap
+ */
+ private void sendResult(Map paramMap){
+ KafkaConsumerProcess kafkaConsumerProcess = new KafkaConsumerProcess();
+ kafkaConsumerProcess.setBootstrapServers(bootstrapServers);
+ //输入配置
+ Map input = (Map) paramMap.get(Constants.INPUT);
+ //网站cid
+ String cid = (String) input.get(Constants.CID);
+ //groupid随机生成,避免多个线程数据偷取
+ String uuGroupId = UUID.randomUUID().toString().replace("-", "");
+ kafkaConsumerProcess.setGroupId(uuGroupId);
+ kafkaConsumerProcess.setTask(paramMap);
+ kafkaConsumerProcess.setWebUrlProfix(webUrlProfix);
+ kafkaConsumerProcess.setTopic(cid);
+ new Thread(kafkaConsumerProcess).start();
+ }
+
+
+ /**
+ * 任务下发到采集平台
+ * @param responsePo
+ * @param params
+ */
+ private void addTask(ResponsePo responsePo,Map params){
+ try {
+ Map input = (Map) params.get(Constants.INPUT);
+ TaskParam taskParam = new TaskParam();
+ //唯一标识
+ String id = UUID.randomUUID().toString();
+ taskParam.setId(id);
+ //网站siteid
+ Integer siteId = (Integer) input.get(Constants.SITEID);
+ taskParam.setSiteId(siteId);
+ //站点类型
+ Integer siteType = (Integer) input.get(Constants.SITETYPE);
+ taskParam.setSiteType(siteType);
+ //采集频率
+ Integer crawlCyclicityTime = Constants.DEFULT_CRAWLCYCLICITYTIME;
+ taskParam.setCrawlCyclicityTime(crawlCyclicityTime);
+ //采集结束时间
+ Long crawlEndTime = (Long) input.get(Constants.CRAWLENDTIME);
+ taskParam.setCrawlEndTime(crawlEndTime);
+ //采集模式
+ Integer crawlMode = 1;
+ taskParam.setCrawlMode(crawlMode);
+ //页面类型
+ List crawlPageTypes = (List) input.get(Constants.CRAWLPAGETYPES);
+ taskParam.setCrawlPageTypes(crawlPageTypes);
+ //增量时间范围
+ Integer crawlPeriodHour = Constants.DEFULT_CRAWLPERIODHOUR;
+ taskParam.setCrawlPeriodHour(crawlPeriodHour);
+ //采集开始时间
+ Long crawlStartTime = (Long) input.get(Constants.CRAWLSTARTTIME);
+ taskParam.setCrawlStartTime(crawlStartTime);
+ //创建者id
+ String createUserId = params.get(Constants.CREATEUSERID).toString();
+ taskParam.setCreateUserId(createUserId);
+ //批量
+ Integer isBatch = 0;
+ taskParam.setIsBatch(isBatch);
+ //翻页上限
+ Integer maxPageNum = 1;
+ taskParam.setMaxPageNum(maxPageNum);
+ //专题id
+ Integer subjectId = Integer.valueOf(params.get(Constants.SOURCE_DATA_ID).toString());
+ taskParam.setSubjectId(subjectId);
+ //任务类型
+ Integer taskType = (Integer) input.get(Constants.TASKTYPE);
+ taskParam.setTaskType(taskType);
+ //关键词
+ String keyWords = ((List)input.get(Constants.CRAWLKEYWORD)).stream().collect(Collectors.joining(" "));
+ taskParam.setCrawlKeyword(keyWords);
+ //标签
+ taskParam.setAttachTag(params.get(Constants.BUSINESSKEY).toString());
+ //网站cid
+ String cid = (String) input.get(Constants.CID);
+ taskParam.setCid(cid);
+
+ Long time = new Date().getTime();
+ taskParam.setTime(time);
+ String appId = Constants.APPID;
+ taskParam.setAppId(appId);
+ //签名
+ String sign = OtherUtils.getMd5(appId + createUserId + time);
+ taskParam.setSign(sign);
+ System.out.println(JSONObject.toJSONString(taskParam));
+ String html = DownLoadUtil.doPost(Constants.CRAWL_ADD_URL, JSONObject.toJSONString(taskParam));
+ if(html.contains(Constants.REQUEST_ERROR_MESSAGE)){
+ log.error("任务下发失败,{}",html);
+ responsePo.setCode(ResponseCode.FAILADDTASK.getCode());
+ responsePo.setMessage(ResponseCode.FAILADDTASK.getMessage());
+ }else{
+ log.info("任务下发结果:{}",html);
+ Map result = JSONObject.parseObject(html);
+ if(result.get(Constants.CODE).equals(Constants.SUCCESS_CODE)){
+ log.info("任务下发成功:{}",html);
+ String crawlDataFlag = null;
+ //构造crawldataflag
+ if(taskType == Constants.KEYWORD_TYPE){
+ crawlDataFlag = Constants.KEYWORD_PREFIX + keyWords;
+ }else if(taskType == Constants.USER_TYPE){
+ crawlDataFlag = Constants.ACCOUNT_PREFIX + keyWords;
+ }else {
+ crawlDataFlag = Constants.URL_PREFIX + keyWords;
+ }
+ //下发成功,写入缓存(数据获取流程用)
+ String redisCacheKey = (cid + Constants.REDISKEY_MIDDLE + crawlDataFlag).toLowerCase();
+ //判断key是否存在
+ boolean isKey = isKeyExists(redisCacheKey);
+ //缓存value存储格式 外层
+ List