From 952a55339b83f6e94a36935b64b0d3f23bbb1f4c Mon Sep 17 00:00:00 2001 From: maojian <550076202@qq.com> Date: Sat, 8 Feb 2025 17:52:31 +0800 Subject: [PATCH] =?UTF-8?q?=E9=99=84=E4=BB=B6=E8=A1=A5=E5=85=85=E4=B8=8B?= =?UTF-8?q?=E8=BD=BD=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .classpath | 40 + .project | 23 + .settings/org.eclipse.core.resources.prefs | 5 + .settings/org.eclipse.jdt.core.prefs | 9 + .settings/org.eclipse.m2e.core.prefs | 4 + pom.xml | 221 ++++++ src/main/java/com/bw/fileDownload/Application.java | 27 + .../com/bw/fileDownload/cache/ConfigCache.java | 38 + .../fileDownload/components/SpringBootKafka.java | 46 ++ .../com/bw/fileDownload/config/KafkaConfig.java | 20 + .../java/com/bw/fileDownload/entity/Constants.java | 78 ++ .../com/bw/fileDownload/entity/FileInfoPo.java | 63 ++ .../com/bw/fileDownload/handler/MainHandler.java | 244 ++++++ .../process/TaskDistributionProcess.java | 76 ++ .../fileDownload/service/DownloadExecService.java | 873 +++++++++++++++++++++ .../fileDownload/service/FileDownloadService.java | 30 + .../service/impl/FileDownloadServiceImpl.java | 160 ++++ .../java/com/bw/fileDownload/utils/FileUtil.java | 30 + .../java/com/bw/fileDownload/utils/StringUtil.java | 122 +++ src/main/resources/application.yml | 103 +++ src/main/resources/logback-spring.xml | 36 + src/test/java/com/bw/AppTest.java | 38 + target/.gitignore | 2 + 23 files changed, 2288 insertions(+) create mode 100644 .classpath create mode 100644 .project create mode 100644 .settings/org.eclipse.core.resources.prefs create mode 100644 .settings/org.eclipse.jdt.core.prefs create mode 100644 .settings/org.eclipse.m2e.core.prefs create mode 100644 pom.xml create mode 100644 src/main/java/com/bw/fileDownload/Application.java create mode 100644 src/main/java/com/bw/fileDownload/cache/ConfigCache.java create mode 100644 src/main/java/com/bw/fileDownload/components/SpringBootKafka.java create mode 100644 src/main/java/com/bw/fileDownload/config/KafkaConfig.java create mode 100644 src/main/java/com/bw/fileDownload/entity/Constants.java create mode 100644 src/main/java/com/bw/fileDownload/entity/FileInfoPo.java create mode 100644 src/main/java/com/bw/fileDownload/handler/MainHandler.java create mode 100644 src/main/java/com/bw/fileDownload/process/TaskDistributionProcess.java create mode 100644 src/main/java/com/bw/fileDownload/service/DownloadExecService.java create mode 100644 src/main/java/com/bw/fileDownload/service/FileDownloadService.java create mode 100644 src/main/java/com/bw/fileDownload/service/impl/FileDownloadServiceImpl.java create mode 100644 src/main/java/com/bw/fileDownload/utils/FileUtil.java create mode 100644 src/main/java/com/bw/fileDownload/utils/StringUtil.java create mode 100644 src/main/resources/application.yml create mode 100644 src/main/resources/logback-spring.xml create mode 100644 src/test/java/com/bw/AppTest.java create mode 100644 target/.gitignore diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..f7e4a1d --- /dev/null +++ b/.classpath @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000..f26865d --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + FileQueueSync + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..839d647 --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding//src/test/java=UTF-8 +encoding/=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..71df522 --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,9 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.methodParameters=generate +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore +org.eclipse.jdt.core.compiler.release=disabled +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..9bc62e8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,221 @@ + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.2.4.RELEASE + + com.bw + FileQueueSync + 0.0.1-SNAPSHOT + jar + + FileQueueSync + http://maven.apache.org + + + UTF-8 + 1.8 + 1.8 + + + + + junit + junit + 4.11 + test + + + org.springframework.boot + spring-boot-starter-web + + + + de.codecentric + spring-boot-admin-starter-client + 2.2.4 + + + com.google.code.gson + gson + 2.8.8 + + + org.springframework.boot + spring-boot-test + + + + org.springframework + spring-test + 5.0.10.RELEASE + test + + + commons-io + commons-io + 1.4 + + + com.alibaba + fastjson + 2.0.17 + + + + com.squareup.okhttp3 + okhttp + 4.9.3 + + + org.apache.httpcomponents + httpclient + 4.5.3 + + + commons-lang + commons-lang + 2.6 + + + + org.jetbrains.kotlin + kotlin-reflect + 1.6.21 + runtime + + + + org.jsoup + jsoup + 1.8.1 + + + + org.projectlombok + lombok + + + + org.springframework.kafka + spring-kafka + + + + junit + junit + + + + p6spy + p6spy + 3.9.0 + + + + commons-collections + commons-collections + 3.2.2 + + + + + + + + + + maven-clean-plugin + 3.1.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.bw.fileDownload.Application + ZIP + + + ${project.groupId} + ${project.artifactId} + + + + + + + repackage + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.1 + + + copy + package + + copy-dependencies + + + jar + jar + runtime + ${project.build.directory}/libs + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/bw/fileDownload/Application.java b/src/main/java/com/bw/fileDownload/Application.java new file mode 100644 index 0000000..3e0a7f7 --- /dev/null +++ b/src/main/java/com/bw/fileDownload/Application.java @@ -0,0 +1,27 @@ +package com.bw.fileDownload; + + + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * 主入口 + * + * @author jian.mao + * @date 2023年7月4日 + * @description + */ +@SpringBootApplication +@EnableScheduling +@EnableKafka +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + +} diff --git a/src/main/java/com/bw/fileDownload/cache/ConfigCache.java b/src/main/java/com/bw/fileDownload/cache/ConfigCache.java new file mode 100644 index 0000000..41e43e2 --- /dev/null +++ b/src/main/java/com/bw/fileDownload/cache/ConfigCache.java @@ -0,0 +1,38 @@ +package com.bw.fileDownload.cache; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * @author jian.mao + * @date 2022年11月11日 + * @description 静态变量类 + */ +@Slf4j +public class ConfigCache { + + /**启动条件**/ + public static boolean isStart = true; + /*****视频任务队列*****/ + public static LinkedBlockingDeque> videoTaskQueue = null; + /*****文件or图片任务队列*****/ + public static LinkedBlockingDeque> fileTaskQueue = null; + /*****无附件队列队列*****/ + public static LinkedBlockingDeque> notFileTaskQueue = null; + + /** + * 队列录入任务 + * @param queue + * @param task + */ + public static void putQueue(LinkedBlockingDeque> queue,Map task){ + //next app 写入队列准备调出 + try { + queue.put(task); + } catch (InterruptedException e) { + log.error("队列写入data失败---"); + } + } +} diff --git a/src/main/java/com/bw/fileDownload/components/SpringBootKafka.java b/src/main/java/com/bw/fileDownload/components/SpringBootKafka.java new file mode 100644 index 0000000..73d6e41 --- /dev/null +++ b/src/main/java/com/bw/fileDownload/components/SpringBootKafka.java @@ -0,0 +1,46 @@ +package com.bw.fileDownload.components; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +/** + * @PROJECT_NAME: companybusinesscrawl + * @DESCRIPTION:SpringBootKafka 工具类 + * @AUTHOR: ying.zhao + * @DATE: 2023/4/6 11:09 + */ +@Slf4j +@Component +public class SpringBootKafka { + @Autowired + private KafkaTemplate kafkaTemplate; + /** + * 自定义topicKafkaTemplate + */ + /** + * public static final String TOPIC = "companyBussTest"; + **/ + public void send(String topic, String message) { + //发送消息 + ListenableFuture> future = kafkaTemplate.send(topic, message); + future.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + //发送失败的处理 + log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult stringObjectSendResult) { + //成功的处理 + log.info(topic + " - 生产者 发送消息成功" ); + } + }); + } +} diff --git a/src/main/java/com/bw/fileDownload/config/KafkaConfig.java b/src/main/java/com/bw/fileDownload/config/KafkaConfig.java new file mode 100644 index 0000000..fa4a0ca --- /dev/null +++ b/src/main/java/com/bw/fileDownload/config/KafkaConfig.java @@ -0,0 +1,20 @@ +package com.bw.fileDownload.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +/** + * kafka配置类 + * @author jian.mao + * @date 2023年7月6日 + * @description + */ +@Configuration +public class KafkaConfig { + @Value("${kafka.consumer.topic}") + private String kafkaTopic; + + public String getKafkaTopic() { + return kafkaTopic; + } +} diff --git a/src/main/java/com/bw/fileDownload/entity/Constants.java b/src/main/java/com/bw/fileDownload/entity/Constants.java new file mode 100644 index 0000000..1c51004 --- /dev/null +++ b/src/main/java/com/bw/fileDownload/entity/Constants.java @@ -0,0 +1,78 @@ +package com.bw.fileDownload.entity; + +/** + * 常量类 + * @author jian.mao + * @date 2025年2月5日 + * @description + */ +public class Constants { + /******采集引擎任务体常量key**********/ + public final static String TASKDATA = "taskdata"; + /******采集引擎下载体常量key**********/ + public final static String SPIDERDATA = "spiderdata"; + /******采集引擎解析体常量key**********/ + public final static String PARSEDATA = "parsedata"; + /******采集标签常量key**********/ + public final static String PAGE_SWITCHS = "page_switchs"; + /******文件下载表示常量key*******/ + public static final String DOWNLOADFILE = "downloadFile"; + /******视频下载表示常量key*******/ + public static final String DOWNLOADVIDEO = "downloadVideo"; + /******图片下载表示常量key*******/ + public static final String DOWNLOADPIC = "downloadPic"; + /******data常量******/ + public static final String DATA = "data"; + /******文件链接存储key******/ + public static final String FILELIST = "fileList"; + /******图片链接存储key******/ + public static final String IMGLIST = "imgList"; + /******视频链接存储key******/ + public static final String VIDEOLIST = "videoList"; + + public static final String CID = "cid"; + public static final String URL = "url"; + + + public static String MSGQUEUENAME = "msgqueuename"; + + public static String VIDEO_PATH = "videoList"; + + public static String FILE_PATH = "fileList"; + + public static String IMAGE_PATH = "imgList"; + + public static Integer NO_DOWNLOAD_URL = 402001; + + public static Integer NO_DOWNLOAD_TAG = 402002; + + public static Integer DOWNLOAD_VIDEO_FAIL = 402003; + + public static Integer DOWNLOAD_IMAGE_FAIL = 402004; + + public static Integer DOWNLOAD_FILE_FAIL = 402005; + + public static Integer UPLOAD_VIDEO_FAIL = 402006; + + public static Integer UPLOAD_IMAGE_FAIL = 402007; + + public static Integer UPLOAD_FILE_FAIL = 402008; + + public static Integer VIDEO_LIST_NULL = 402009; + + public static Integer IMAGE_LIST_NULL = 402010; + + public static Integer FILE_LIST_NULL = 402011; + + public static Integer REDIS_ERROR = 402012; + + public static Integer URL_FORMAT = 402012; + + public static Integer DOWNLOAD_STANDARD = 402013; + + public static Integer DOWNLOAD_SUCCESS = 0; + + public static Integer DOWNLOAD_NOT_IMAGE = 402014; + + public static String DOWNLOAD_STATUS_CODE = "downloadStatusCode"; +} diff --git a/src/main/java/com/bw/fileDownload/entity/FileInfoPo.java b/src/main/java/com/bw/fileDownload/entity/FileInfoPo.java new file mode 100644 index 0000000..eb4ef72 --- /dev/null +++ b/src/main/java/com/bw/fileDownload/entity/FileInfoPo.java @@ -0,0 +1,63 @@ +package com.bw.fileDownload.entity; + +import com.alibaba.fastjson.JSON; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author:jinming + * @className:FileInfoPo + * @version:1.0 + * @description: + * @Date:2023/2/28 10:13 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FileInfoPo { + /** + * 文件原始Url + */ + private String fileUrl; + /** + * gofast下载url + */ + private String goFastUrl; + /** + * 文件名 + */ + private String fileName; + /** + * 文件类型 + */ + private String fileType; + /** + * 图片尺寸 + */ + private String resolution; + /** + * 文件大小 + */ + private String size; + /** + * 视频时长 + */ + private String videoTime; + /** + * 文件得MD5值 + */ + private String md5; + /** + * 文件的删除地址 + */ + private String delUrl; + /** + * 文件的后缀地址 + */ + private String src; + + public String toJsonString() { + return JSON.toJSONString(this); + } +} \ No newline at end of file diff --git a/src/main/java/com/bw/fileDownload/handler/MainHandler.java b/src/main/java/com/bw/fileDownload/handler/MainHandler.java new file mode 100644 index 0000000..5ff68ad --- /dev/null +++ b/src/main/java/com/bw/fileDownload/handler/MainHandler.java @@ -0,0 +1,244 @@ +package com.bw.fileDownload.handler; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import com.alibaba.fastjson.JSONObject; +import com.bw.fileDownload.cache.ConfigCache; +import com.bw.fileDownload.service.FileDownloadService; +import com.bw.fileDownload.utils.FileUtil; + +import lombok.extern.slf4j.Slf4j; + +/** + * 执行入口 + * @author jian.mao + * @date 2025年2月5日 + * @description + */ +@Component +@Slf4j +public class MainHandler implements ApplicationRunner{ + + @Autowired + private FileDownloadService fileDownloadService; + /***线程池参数***/ + @Value("${threadPool.corePoolSize}") + private int corePoolSize; + @Value("${threadPool.maximumPoolSize}") + private int maximumPoolSize; + @Value("${threadPool.keepAliveTime}") + private long keepAliveTime; + @Value("${threadPool.queueSize}") + private int threadQueueSize; + @Value("${task.queue-size}") + private Integer taskQueueSize; + + @Value("${task.video-task-queue-path}") + private String videoTaskPath; + @Value("${task.docOrImg-task-queue-path}") + private String docOrImgTaskPath; + @Value("${task.notFile-task-queue-path}") + private String notFileTaskPath; + @Override + public void run(ApplicationArguments args) throws Exception { + //设置队列长度 + if(taskQueueSize == null || taskQueueSize == 0){ + taskQueueSize = 10000; + } + /*****视频任务队列*****/ + ConfigCache.videoTaskQueue = new LinkedBlockingDeque>(taskQueueSize); + /*****文档or图片任务队列*****/ + ConfigCache.fileTaskQueue = new LinkedBlockingDeque>(taskQueueSize); + /*****无附件任务队列*****/ + ConfigCache.notFileTaskQueue = new LinkedBlockingDeque>(taskQueueSize); + //视频线程池 + ThreadPoolExecutor videoExecutor = new ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(threadQueueSize), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + //视频任务执行 + Thread videoConsumer = new Thread(() -> { + while(ConfigCache.isStart) { + try { + Map task = ConfigCache.videoTaskQueue.take(); + videoExecutor.execute(()-> videoDownloadExec(task)); + } catch (Exception e) { + log.error("视频任务执行异常------e:",e); + } + } + }); + videoConsumer.start(); + log.info("视频任务执行线程启动------"); + //文件or图片线程池 + ThreadPoolExecutor docOrImgExecutor = new ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(threadQueueSize), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + //文件or图片任务执行 + Thread docOrImgConsumer = new Thread(() -> { + while(ConfigCache.isStart) { + try { + Map task = ConfigCache.fileTaskQueue.take(); + docOrImgExecutor.execute(()-> docOrImgDownloadExec(task)); + } catch (Exception e) { + // TODO: handle exception + log.error("文件or图片任务执行异常------e:",e); + } + } + }); + docOrImgConsumer.start(); + log.info("文件图片任务执行线程启动-----"); + //无附件线程池 + ThreadPoolExecutor notFileExecutor = new ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(threadQueueSize), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + //无附件任务执行 + Thread notFileConsumer = new Thread(() -> { + while(ConfigCache.isStart) { + try { + Map task = ConfigCache.notFileTaskQueue.take(); + notFileExecutor.execute(()-> notFileDownloadExec(task)); + } catch (Exception e) { + // TODO: handle exception + log.error("无附件任务执行异常------e:",e); + } + } + }); + notFileConsumer.start(); + log.info("无附件任务执行线程启动-----"); + //启动加载缓存任务 + readTask(videoTaskPath, ConfigCache.videoTaskQueue); + readTask(docOrImgTaskPath, ConfigCache.fileTaskQueue); + readTask(notFileTaskPath, ConfigCache.notFileTaskQueue); + //停止处理 + waitDown(); + } + + private void videoDownloadExec(Map task) { + fileDownloadService.video(task); + } + private void docOrImgDownloadExec(Map task) { + fileDownloadService.docOrImg(task); + } + private void notFileDownloadExec(Map task) { + fileDownloadService.notFile(task); + } + + /****************************************************************load******************************************************************************/ + /** + * 加载文件中的任务 + * @param path 文件地址 + * @param queue 队列 + */ + @SuppressWarnings("unchecked") + public static void readTask(String path, LinkedBlockingDeque> queue) { + File file = new File(path); + if (file.exists()) { + List tasks = null; + try { + tasks = FileUtils.readLines(file, "UTF-8"); + } catch (IOException e) { + e.printStackTrace(); + } + for (String taskStr : tasks) { + Map task = JSONObject.parseObject(taskStr); + try { + queue.put(task); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + file.delete(); + } + } + + /*******************************************************************stop************************************************************************/ + + /** + * 结束触发钩子 + */ + public void waitDown() { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // 停止线程 + ConfigCache.isStart = false; + log.info("stop-------"); + writeTsskToFile(); + } + }); + } + + + /** + * 任务持久化到硬盘 + */ + public void writeTsskToFile() { + while (true) { + if (ConfigCache.videoTaskQueue.size() > 0) { + try { + Map task = ConfigCache.videoTaskQueue.take(); + FileUtil.writeFile(videoTaskPath ,JSONObject.toJSONString(task)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + log.info("taskQueue write is file end"); + break; + } + } + while (true) { + if (ConfigCache.fileTaskQueue.size() > 0) { + try { + Map task = ConfigCache.fileTaskQueue.take(); + FileUtil.writeFile(docOrImgTaskPath, JSONObject.toJSONString(task)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + log.info("taskQueue write is file end"); + break; + } + } + while (true) { + if (ConfigCache.notFileTaskQueue.size() > 0) { + try { + Map task = ConfigCache.notFileTaskQueue.take(); + FileUtil.writeFile(notFileTaskPath, JSONObject.toJSONString(task)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + log.info("taskQueue write is file end"); + break; + } + } + } +} diff --git a/src/main/java/com/bw/fileDownload/process/TaskDistributionProcess.java b/src/main/java/com/bw/fileDownload/process/TaskDistributionProcess.java new file mode 100644 index 0000000..6b0279a --- /dev/null +++ b/src/main/java/com/bw/fileDownload/process/TaskDistributionProcess.java @@ -0,0 +1,76 @@ +package com.bw.fileDownload.process; + +import java.util.List; +import java.util.Map; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import com.alibaba.fastjson.JSONObject; +import com.bw.fileDownload.cache.ConfigCache; +import com.bw.fileDownload.entity.Constants; + +import lombok.extern.slf4j.Slf4j; + +/** + * 任务读取并分配 + * @author jian.mao + * @date 2025年2月5日 + * @description + */ +@Component +@Slf4j +public class TaskDistributionProcess { + + @KafkaListener(topics = "#{kafkaConfig.getKafkaTopic()}") + public void consumeMessage(String message) { + Map result = JSONObject.parseObject(message); + taskSplit(result); + } + + /** + * 任务拆分(文件or图片、视频、无附件) + * @param task + */ + @SuppressWarnings("unchecked") + private void taskSplit(Map task) { + //先判断下载标签 + Map taskdata = (Map) task.get(Constants.TASKDATA); + Map pageSwitchs = (Map) taskdata.get(Constants.PAGE_SWITCHS); + Map parsedata = (Map) task.get(Constants.PARSEDATA); + //解析后的数据体 + Map data = (Map) parsedata.get(Constants.DATA); + //判断视频标识 + if(pageSwitchs.containsKey(Constants.DOWNLOADVIDEO) && (boolean)pageSwitchs.get(Constants.DOWNLOADVIDEO)) { + //获取所需下载文件链接 + List videList = (List) data.get(Constants.VIDEOLIST); + if(videList != null && videList.size() > 0) { + //有视频链接 + ConfigCache.putQueue(ConfigCache.videoTaskQueue, task); + return; + } + } + //判断图片标识 + if(pageSwitchs.containsKey(Constants.DOWNLOADPIC) && (boolean)pageSwitchs.get(Constants.DOWNLOADPIC)) { + //获取所需下载文件链接 + List imgList = (List) data.get(Constants.IMGLIST); + if(imgList != null && imgList.size() > 0) { + //有图片链接 + ConfigCache.putQueue(ConfigCache.fileTaskQueue, task); + return; + } + } + //判断文件标识 + if(pageSwitchs.containsKey(Constants.DOWNLOADFILE) && (boolean)pageSwitchs.get(Constants.DOWNLOADFILE)) { + //获取所需下载文件链接 + List fileList = (List) data.get(Constants.FILELIST); + if(fileList != null && fileList.size() > 0) { + //有文件链接 + ConfigCache.putQueue(ConfigCache.fileTaskQueue, task); + return; + } + } + //即没视频又没文件 + ConfigCache.putQueue(ConfigCache.notFileTaskQueue, task); + } +} diff --git a/src/main/java/com/bw/fileDownload/service/DownloadExecService.java b/src/main/java/com/bw/fileDownload/service/DownloadExecService.java new file mode 100644 index 0000000..46bf67e --- /dev/null +++ b/src/main/java/com/bw/fileDownload/service/DownloadExecService.java @@ -0,0 +1,873 @@ +package com.bw.fileDownload.service; + +import java.awt.image.BufferedImage; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.math.BigDecimal; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bw.fileDownload.entity.Constants; +import com.bw.fileDownload.entity.FileInfoPo; +import com.bw.fileDownload.utils.StringUtil; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.Call; +import okhttp3.Headers; +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; + +@Component +@Slf4j +public class DownloadExecService { + + @Value("${file.download.path}") + private String downloadFilePath; + @Value("${file.upload.url}") + private String gofastUrl; + + private static final String IMAGE_TYPE_JPG = "jpg"; + private static final String IMAGE_TYPE_JPEG = "jpeg"; + private static final String IMAGE_TYPE_PNG = "png"; + private static OkHttpClient client = new OkHttpClient(); + public Map downloadAndUploadVidoAndAudio(List videoList, Map resultData) { + String header = "header"; + Map hederMap = new HashMap<>(); + List videoPath = new ArrayList<>(); + List delVideoAndAudioList = new ArrayList<>(); + Map returnMap = new HashMap<>(32); + List videoUrl = new ArrayList<>(); + List videoPathSize = new ArrayList<>(); + + Integer downloadStatusCode = 0; + for (String url : videoList) { + Map delVideoAndAudioMap = new HashMap(32); + Map videoMap = new HashMap(32); + Map videoPathMap = new HashMap(32); + if (formatJudgment(url)) { + downloadStatusCode = Constants.URL_FORMAT; + continue; + } + String[] split = url.split("###"); + String videoType = split[1]; + String fileType = videoType; + String m3u8Type = ".m3u8"; + String youtubeTypeFr = "youtube"; + String youtubeTypeSec = "youtu.be"; + String needDownloadVideoUrl = split[0]; + try { + if (!StringUtil.hasValue(needDownloadVideoUrl)) { + downloadStatusCode = Constants.VIDEO_LIST_NULL; + continue; + } + if (m3u8Type.equals(fileType)) { + downLoadVideoExeCommandForFfmpeg(needDownloadVideoUrl); + } else if (url.contains(youtubeTypeFr) || url.contains(youtubeTypeSec)) { + downLoadVideoExeCommandLp(needDownloadVideoUrl, videoType); + } else { + File file = new File(downloadFilePath + getMd5(needDownloadVideoUrl).concat(fileType)); + if (resultData.containsKey(header)) { + hederMap = (Map) resultData.get(header); + downloadFile(needDownloadVideoUrl, downloadFilePath, getMd5(needDownloadVideoUrl).concat(fileType), hederMap); + } else { + downloadFile(needDownloadVideoUrl, downloadFilePath, getMd5(needDownloadVideoUrl).concat(fileType)); + if (!file.exists() || file.length() < 100) { + downLoadFileExeCommandForWGet(needDownloadVideoUrl, fileType); + } + } + } + } catch (Exception e) { + downloadStatusCode = Constants.DOWNLOAD_VIDEO_FAIL; + log.error("Video or audio download failed url:{},Exception:",url,e); + continue; + } + String delVideoAndAudioUrl = ""; + String uploadFileUrl = ""; + try { + Map videoTimeMap = getVideoTime(downloadFilePath + getMd5(needDownloadVideoUrl) + videoType); + String videoTime = videoTimeMap.get("videoTime"); + String resolution = videoTimeMap.get("resolution"); + videoPathMap.put("videoTime", videoTime); + videoPathMap.put("resolution", resolution); + String uploadResultJson = ""; + + if (m3u8Type.equals(fileType)) { + uploadResultJson = upLoadFile(downloadFilePath + getMd5(needDownloadVideoUrl) + ".mp4"); + } else { + uploadResultJson = upLoadFile(downloadFilePath + getMd5(needDownloadVideoUrl) + videoType); + } + JSONObject jsonObject = JSON.parseObject(uploadResultJson); + String md5 = jsonObject.getString("md5"); + delVideoAndAudioUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5; + uploadFileUrl = jsonObject.getString("url").replaceAll("\\?.*", ""); + String src = jsonObject.getString("src"); + Long size = jsonObject.getLong("size"); + String printSize = getPrintSize(size); + videoPathMap.put("size", printSize); + videoPathMap.put("url", src); + + } catch (Exception e) { + downloadStatusCode = Constants.UPLOAD_VIDEO_FAIL; + log.error("Video upload failed url:{},Exception:",url,e); + continue; + } + videoPath.add(uploadFileUrl); + delVideoAndAudioMap.put(uploadFileUrl, delVideoAndAudioUrl); + delVideoAndAudioList.add(delVideoAndAudioMap); + videoMap.put("originalUrl", needDownloadVideoUrl); + videoMap.put("gofastUrl", uploadFileUrl); + videoUrl.add(videoMap); + videoPathSize.add(videoPathMap); + } + returnMap.put("videoUrl", videoUrl); + returnMap.put("videoPath", videoPath); + returnMap.put("delVideoAndAudioList", delVideoAndAudioList); + returnMap.put("downloadStatusCode", downloadStatusCode); + resultData.put("ErroeMessage", downloadStatusCode); + returnMap.put("videoPathSize", videoPathSize); + resultData.put("videoUrl", videoUrl); + resultData.put("videoPath", videoPath); + resultData.put("delVideoAndAudioList", delVideoAndAudioList); + resultData.put("videoPathSize", videoPathSize); + return returnMap; + } + private boolean formatJudgment(String url) { + String formatTag = "###"; + if (!url.contains(formatTag)) { + return true; + } + return false; + } + + private void downLoadVideoExeCommandForFfmpeg(String videoUrl) { + String news_id = getMd5(videoUrl); + String command = "ffmpeg -i " + videoUrl + " " + downloadFilePath + + news_id + ".mp4"; + InputStream in = null; + BufferedReader read = null; + String Qavg = "Qavg:"; + try { + System.out.println("DownloadUtil" + command); + Process process; + try { + String line = ""; + File file = new File(downloadFilePath); + if (!file.exists()) { + file.mkdirs(); + } + process = Runtime.getRuntime().exec(command); + in = process.getErrorStream(); + read = new BufferedReader(new InputStreamReader(in)); + while ((line = read.readLine()) != null) { + System.out.println("DownloadUtil" + line); + if (line.contains(Qavg)) { + break; + } + } + process.destroy(); + } catch (IOException var26) { + var26.printStackTrace(); + } + } finally { + try { + in.close(); + read.close(); + } catch (IOException var25) { + var25.printStackTrace(); + } + } + } + + private void downLoadVideoExeCommandLp(String videoUrl, String videoType) { + String command = ""; + List stringList = doGetVideoIdCommand(videoUrl); + for (String s : stringList) { + String videoId = s.split(" ")[0]; + command = "yt_dlp --no-check-certificate -f " + videoId + " -o " + downloadFilePath + getMd5(videoUrl) + videoType + " " + videoUrl; + log.info("视频下载命令:" + command); + break; + } + InputStream in = null; + BufferedReader read = null; + try { + Process process; + try { + String line = ""; + File file = new File(downloadFilePath); + if (!file.exists()) { + file.mkdirs(); + } + process = Runtime.getRuntime().exec(command, (String[]) null, file); + in = process.getInputStream(); + read = new BufferedReader(new InputStreamReader(in)); + while ((line = read.readLine()) != null) { + System.out.println(line); + } + process.destroy(); + } catch (IOException var26) { + var26.printStackTrace(); + } + } finally { + try { + in.close(); + read.close(); + } catch (IOException var25) { + var25.printStackTrace(); + } + } + } + + private void downLoadFileExeCommandForWGet(String fileUrl, String filetype) { + String news_id = getMd5(fileUrl); + String htpps = "https"; + String command = "wget -d --user-agent=\"Mozilla/5.0 (Windows NT xy; rv:10.0) Gecko/20100101 Firefox/10.0\" " + fileUrl + " -O " + downloadFilePath + news_id + filetype; + if (!fileUrl.contains(htpps)) { + command = "wget --no-check-certificate -d --user-agent=\"Mozilla/5.0 (Windows NT xy; rv:10.0) Gecko/20100101 Firefox/10.0\" " + fileUrl + " -O " + downloadFilePath + news_id + filetype; + + } + InputStream in = null; + BufferedReader read = null; + try { + log.info("wget 方法进入:" + command); + Process process; + try { + String line = ""; + process = Runtime.getRuntime().exec(command); + log.info("执行命令"); + log.info("视频下载命令:" + command); + in = process.getErrorStream(); + read = new BufferedReader(new InputStreamReader(in)); + while ((line = read.readLine()) != null) { + log.info("视频下载进度:" + line); + } + process.destroy(); + } catch (IOException var26) { + var26.printStackTrace(); + log.info("下载视频异常", var26); + } + } finally { + try { + in.close(); + read.close(); + } catch (IOException var25) { + var25.printStackTrace(); + } + } + } + public String getMd5(String string) { + try { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] bs = md5.digest(string.getBytes("UTF-8")); + StringBuilder sb = new StringBuilder(40); + for (byte x : bs) { + if ((x & 0xff) >> 4 == 0) { + sb.append("0").append(Integer.toHexString(x & 0xff)); + } else { + sb.append(Integer.toHexString(x & 0xff)); + } + } + return sb.toString(); + } catch (Exception e) { + log.info("get md 5 exception:", e); + return "nceaform" + System.currentTimeMillis(); + } + } + private List doGetVideoIdCommand(String videoUrl) { + List resultList = new ArrayList(); + String command = "yt_dlp --no-check-certificate -F " + videoUrl; + InputStream in = null; + BufferedReader read = null; + try { + Process process; + try { + String line = ""; + process = Runtime.getRuntime().exec(command, (String[]) null); + in = process.getInputStream(); + read = new BufferedReader(new InputStreamReader(in, "gbk")); + while ((line = read.readLine()) != null) { + System.out.println(line); + if (line.contains("mp4")) { + if (line.contains("256x144") && !line.contains("video only")) { + resultList.add(line); + break; + } else if (line.contains("640x360") && !line.contains("video only")) { + resultList.add(line); + break; + } else if (line.contains("854x480") && !line.contains("video only")) { + resultList.add(line); + break; + } else if (line.contains("1280x720") && !line.contains("video only")) { + resultList.add(line); + break; + } + } + } + process.destroy(); + } catch (IOException var26) { + var26.printStackTrace(); + } + } finally { + try { + in.close(); + read.close(); + } catch (IOException var25) { + var25.printStackTrace(); + } + } + return resultList; + } + public void downloadFile(String fileUrl, String savePath, String customFileName, Map... headers) throws IOException { + okhttp3.Request.Builder builder = new okhttp3.Request.Builder(); + if (headers != null && headers.length > 0) { + Map tempHeaders = headers[0]; + Iterator iterator = tempHeaders.keySet().iterator(); + while (iterator.hasNext()) { + String key = iterator.next(); + builder.addHeader(key, tempHeaders.get(key)); + } + } else { + builder.addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7").addHeader("Accept-Language", "zh-CN,zh;q=0.9").addHeader("Cache-Control", "no-cache").addHeader("Connection", "keep-alive").addHeader("Pragma", "no-cache").addHeader("Sec-Fetch-Dest", "document").addHeader("Sec-Fetch-Mode", "navigate").addHeader("Sec-Fetch-Site", "none").addHeader("Sec-Fetch-User", "?1").addHeader("Upgrade-Insecure-Requests", "1").addHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36").addHeader("sec-ch-ua", "\"Google Chrome\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"").addHeader("sec-ch-ua-mobile", "?0").addHeader("sec-ch-ua-platform", "\"Windows\""); + } + Request request = builder.url(fileUrl).get().build(); + Call call = client.newCall(request); + Response response = call.execute(); + if (!response.isSuccessful()) { + throw new IOException("Failed to download file: " + response); + } + + ResponseBody responseBody = response.body(); + if (responseBody != null) { + InputStream inputStream = responseBody.byteStream(); + File fileDirectory = new File(savePath); + + if (!fileDirectory.exists()) { + fileDirectory.mkdirs(); + } + + File file = new File(fileDirectory, customFileName); + FileOutputStream outputStream = new FileOutputStream(file); + + byte[] buffer = new byte[4096]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + + outputStream.flush(); + outputStream.close(); + inputStream.close(); + } + } + private Map getVideoTime(String video_path) { + Map infoMap = new HashMap(32); + String videoTime = ""; + String resolution = ""; + String command = "ffmpeg -i " + video_path; + InputStream in = null; + BufferedReader read = null; + try { + String info = ""; + String line = ""; + System.out.println("视频时间解析命令" + command); + Process process; + process = Runtime.getRuntime().exec(command); + in = process.getErrorStream(); + read = new BufferedReader(new InputStreamReader(in)); + while ((line = read.readLine()) != null) { + info = info + line; + } + process.destroy(); + + String regexDuration = "Duration: (.*?), start: (.*?), bitrate: (\\d*) kb\\/s"; + Pattern pattern = Pattern.compile(regexDuration); + Matcher m = pattern.matcher(info); + if (m.find()) { + videoTime = getTimelen(m.group(1)) + "s"; + System.out.println("视频时长:" + videoTime + "s , 开始时间:" + m.group(2) + ", 比特率:" + m.group(3) + "kb/s"); + } + String regexVideo = "Video: (.*?), (.*?), (\\d+x\\d+)[,\\s]"; + pattern = Pattern.compile(regexVideo); + m = pattern.matcher(info); + if (m.find()) { + resolution = m.group(3); + System.out.println("编码格式:" + m.group(1) + ", 视频格式:" + m.group(2) + ", 分辨率:" + resolution + "kb/s"); + } + } catch (Exception e) { + e.printStackTrace(); + } + infoMap.put("videoTime", videoTime); + infoMap.put("resolution", resolution); + return infoMap; + } + private String upLoadFile(String filePath) { + File file = new File(filePath); + String realFilename = filePath.substring(filePath.lastIndexOf(File.separator) + 1); + MultipartBody.Builder builder = new MultipartBody.Builder().setType(MultipartBody.FORM); + builder.addPart(Headers.of("Content-Disposition", "form-data; name=\"file\";filename=\"" + realFilename + "\""), + RequestBody.create(MediaType.parse("image/png"), file) + + ).addFormDataPart("output", "json").build(); + RequestBody body = builder.build(); + Request request = new Request.Builder().url(gofastUrl).post(body).header("Expect", "100-continue").build(); + OkHttpClient.Builder okBuilder = new OkHttpClient.Builder(); + OkHttpClient client = okBuilder.connectTimeout(600, TimeUnit.MILLISECONDS) + .readTimeout(600, TimeUnit.SECONDS).build(); + Call call = client.newCall(request); + String html = ""; + Response response = null; + try { + response = call.execute(); + html = response.body().string(); + System.out.println("DownloadUtil" + html); + } catch (IOException e) { + log.info("upload fail:" + filePath); + e.printStackTrace(); + } finally { + response.close(); + } + file.delete(); + return html; + } + + public String getPrintSize(long size) { + BigDecimal bigDecimal = new BigDecimal(size); + BigDecimal bigDecimal1 = new BigDecimal(1024); + BigDecimal divide = bigDecimal.divide(bigDecimal1, 2, BigDecimal.ROUND_HALF_UP); + return divide.toString() + "KB"; + } + private String getTimelen(String timelen) { + int min = 0; + String index = "0"; + int two = 2; + String[] strs = timelen.split(":"); + if (strs[0].compareTo(index) > 0) { + + min += Integer.valueOf(strs[0]) * 60 * 60; + } + if (strs[1].compareTo(index) > 0) { + min += Integer.valueOf(strs[1]) * 60; + } + if (strs[two].compareTo(index) > 0) { + min += Math.round(Float.valueOf(strs[2])); + } + return String.valueOf(min); + } + + + public Map DownloadAndUploadImg(List imgList, Map resultData) { + String header = "header"; + Map hederMap = new HashMap<>(); + String cid = (String) resultData.get("cid"); + List imgPath = new ArrayList<>(); + List delimgList = new ArrayList<>(); + Map returnMap = new HashMap<>(32); + List imagePathSize = new ArrayList<>(); + Map contentimgs = new HashMap<>(32); + int imgTag = 1; + Integer downloadStatusCode = 0; + for (String url : imgList) { + if (StringUtil.checkDoubleHttp(url)) { + log.info("Download url have many http:" + url + ",cid:" + cid); + continue; + } + Map delFileMap = new HashMap(32); + Map imgMap = new HashMap(32); + Map imagePathMap = new HashMap(32); + if (formatJudgment(url)) { + downloadStatusCode = Constants.URL_FORMAT; + log.info("Download url splicing error url:" + url + ",cid:" + cid); + continue; + } + String[] split = url.split("###"); + String fileUrl = split[0]; + String fileType = split[1]; + try { + if (!StringUtil.hasValue(fileUrl)) { + downloadStatusCode = Constants.IMAGE_LIST_NULL; + continue; + } + File file = new File(downloadFilePath + getMd5(fileUrl).concat(fileType)); + if (resultData.containsKey(header)) { + hederMap = (Map) resultData.get(header); + downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType), hederMap); + } else { + downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType)); + if (!file.exists() || file.length() < 100) { + downLoadFileExeCommandForWGet(fileUrl, fileType); + } + } + + } catch (Exception e) { + downloadStatusCode = Constants.DOWNLOAD_IMAGE_FAIL; + log.info("Image download failed url:{},Exception:",url, e); + e.printStackTrace(); + continue; + } + String delimgUrl = ""; + String uploadimgUrl = ""; + try { + String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType); + JSONObject jsonObject = JSON.parseObject(uploadResultJson); + String md5 = jsonObject.getString("md5"); + delimgUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5; + uploadimgUrl = jsonObject.getString("url").replaceAll("\\?.*", "");; + Long size = jsonObject.getLong("size"); + imagePathMap.put("videoTime", ""); + try { + String imgWidthAndHeight = getImgWidthAndHeight(downloadFilePath + getMd5(fileUrl) + fileType); + String printSize = getPrintSize(size); + String src = jsonObject.getString("src"); + imagePathMap.put("resolution", imgWidthAndHeight); + imagePathMap.put("size", printSize); + imagePathMap.put("url", src); + } catch (Exception e) { + String printSize = getPrintSize(size); + String src = jsonObject.getString("src"); + imagePathMap.put("resolution", "100x100"); + imagePathMap.put("size", printSize); + imagePathMap.put("url", src); + log.error("Image size calculation failed url:{},Exception:",url, e); + } + } catch (Exception e) { + downloadStatusCode = Constants.UPLOAD_IMAGE_FAIL; + log.error("Image upload failed url:{},Exception:",url, e); + continue; + } + imgPath.add(uploadimgUrl); + delFileMap.put(uploadimgUrl, delimgUrl); + delimgList.add(delFileMap); + imgMap.put("img", fileUrl); + imgMap.put("rawimg", fileUrl); + imgMap.put("imgtag", "img" + imgTag); + imgMap.put("uploadImg", uploadimgUrl); + contentimgs.put("img" + imgTag, imgMap); + imagePathSize.add(imagePathMap); + imgTag++; + } + returnMap.put("contentimgs", contentimgs); + returnMap.put("imagePath", imgPath); + returnMap.put("delimgList", delimgList); + returnMap.put("downloadStatusCode", downloadStatusCode); + resultData.put("ErroeMessage", downloadStatusCode); + returnMap.put("imagePathSize", imagePathSize); + resultData.put("contentimgs", contentimgs); + resultData.put("imagePath", imgPath); + resultData.put("delimgList", delimgList); + resultData.put("imagePathSize", imagePathSize); + return returnMap; + } + + public Map DownloadAndUploadImgNoUa(List imgList, Map resultData) { + String cid = (String) resultData.get("cid"); + List imgPath = new ArrayList<>(); + List delimgList = new ArrayList<>(); + Map returnMap = new HashMap<>(32); + List imagePathSize = new ArrayList<>(); + Map contentimgs = new HashMap<>(32); + int imgTag = 1; + Integer downloadStatusCode = 0; + for (String url : imgList) { + if (StringUtil.checkDoubleHttp(url)) { + log.info("Download url have many http:" + url + ",cid:" + cid); + continue; + } + Map delFileMap = new HashMap(32); + Map imgMap = new HashMap(32); + Map imagePathMap = new HashMap(32); + if (formatJudgment(url)) { + downloadStatusCode = Constants.URL_FORMAT; + log.info("Download url splicing error url:" + url + ",cid:" + cid); + continue; + } + String[] split = url.split("###"); + String fileUrl = split[0]; + String fileType = split[1]; + try { + if (!StringUtil.hasValue(fileUrl)) { + downloadStatusCode = Constants.IMAGE_LIST_NULL; + continue; + } + downLoadFileExeCommandForWGetNoUa(fileUrl, fileType); + } catch (Exception e) { + downloadStatusCode = Constants.DOWNLOAD_IMAGE_FAIL; + log.info("Image download failed url:{},Exception:",url, e); + continue; + } + String delimgUrl = ""; + String uploadimgUrl = ""; + try { + String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType); + JSONObject jsonObject = JSON.parseObject(uploadResultJson); + String md5 = jsonObject.getString("md5"); + delimgUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5; + uploadimgUrl = jsonObject.getString("url").replaceAll("\\?.*", "");; + Long size = jsonObject.getLong("size"); + imagePathMap.put("videoTime", ""); + try { + String imgWidthAndHeight = getImgWidthAndHeight(downloadFilePath + getMd5(fileUrl) + fileType); + String printSize = getPrintSize(size); + String src = jsonObject.getString("src"); + imagePathMap.put("resolution", imgWidthAndHeight); + imagePathMap.put("size", printSize); + imagePathMap.put("url", src); + } catch (Exception e) { + String printSize = getPrintSize(size); + String src = jsonObject.getString("src"); + imagePathMap.put("resolution", "100x100"); + imagePathMap.put("size", printSize); + imagePathMap.put("url", src); + log.info("Image size calculation failed url:" + url + ",Exception:" + e); + } + } catch (Exception e) { + downloadStatusCode = Constants.UPLOAD_IMAGE_FAIL; + log.error("Image upload failed url:{},Exception:",url, e); + continue; + } + imgPath.add(uploadimgUrl); + delFileMap.put(uploadimgUrl, delimgUrl); + delimgList.add(delFileMap); + imgMap.put("img", fileUrl); + imgMap.put("rawimg", fileUrl); + imgMap.put("imgtag", "img" + imgTag); + imgMap.put("uploadImg", uploadimgUrl); + contentimgs.put("img" + imgTag, imgMap); + imagePathSize.add(imagePathMap); + imgTag++; + } + returnMap.put("contentimgs", contentimgs); + returnMap.put("imagePath", imgPath); + returnMap.put("delimgList", delimgList); + returnMap.put("downloadStatusCode", downloadStatusCode); + resultData.put("ErroeMessage", downloadStatusCode); + returnMap.put("imagePathSize", imagePathSize); + resultData.put("contentimgs", contentimgs); + resultData.put("imagePath", imgPath); + resultData.put("delimgList", delimgList); + resultData.put("imagePathSize", imagePathSize); + return returnMap; + } + public String getImgWidthAndHeight(String filePath) { + File file = new File(filePath); + InputStream is = null; + BufferedImage src = null; + int width = -1; + int height = -1; + try { + is = new FileInputStream(file); + src = javax.imageio.ImageIO.read(is); + width = src.getWidth(null); + height = src.getHeight(null); + is.close(); + } catch (IOException e) { + + } + return width + "x" + height; + } + private void downLoadFileExeCommandForWGetNoUa(String fileUrl, String filetype) { + String news_id = getMd5(fileUrl); + //wget --no-check-certificate -d --user-agent="Mozilla/5.0 (Windows NT xy; rv:10.0) Gecko/20100101 Firefox/10.0" http://gxj.gz.gov.cn/attachment/7/7194/7194190/8648699.pdf -O 1f07794340006d986a7d87684d3ade03.pdf + String command = "wget --no-check-certificate " + fileUrl + " -O " + downloadFilePath + news_id + filetype; + InputStream in = null; + BufferedReader read = null; + try { + log.info("wget 方法进入:" + command); + Process process; + try { + String line = ""; + process = Runtime.getRuntime().exec(command); + log.info("执行命令"); + log.info("视频下载命令:" + command); + in = process.getErrorStream(); + read = new BufferedReader(new InputStreamReader(in)); + while ((line = read.readLine()) != null) { + log.info("视频下载进度:" + line); + } + process.destroy(); + } catch (IOException var26) { + var26.printStackTrace(); + log.info("下载视频异常", var26); + } + } finally { + try { + in.close(); + read.close(); + } catch (IOException var25) { + var25.printStackTrace(); + } + } + } + + public Map DownloadAndUploadFile(List fileList, Map resultData) { + String header = "header"; + Map hederMap = new HashMap<>(); + List filePath = new ArrayList<>(); + List delFileList = new ArrayList<>(); + Map returnMap = new HashMap<>(32); + List forwardUrl = new ArrayList<>(); + List filePathSize = new ArrayList<>(); + Integer downloadStatusCode = 0; + for (String url : fileList) { + Map fileInfoMap = new HashMap(32); + Map delFileMap = new HashMap(32); + Map fileMap = new HashMap(32); + if (formatJudgment(url)) { + downloadStatusCode = Constants.URL_FORMAT; + continue; + } + String[] split = url.split("###"); + String fileUrl = split[0]; + String fileType = split[1]; + try { + if (!StringUtil.hasValue(fileUrl)) { + downloadStatusCode = Constants.IMAGE_LIST_NULL; + continue; + } + File file = new File(downloadFilePath + getMd5(fileUrl).concat(fileType)); + if (resultData.containsKey(header)) { + hederMap = (Map) resultData.get(header); + downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType), hederMap); + } else { + downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType)); + if (!file.exists() || file.length() < 100) { + downLoadFileExeCommandForWGet(fileUrl, fileType); + } + } + } catch (Exception e) { + downloadStatusCode = Constants.DOWNLOAD_FILE_FAIL; + log.error("File download failed url:{},Exception:",url,e); + continue; + } + String delFileUrl = ""; + String uploadFileUrl = ""; + try { + fileInfoMap.put("videoTime", ""); + fileInfoMap.put("resolution", ""); + String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType); + JSONObject jsonObject = JSON.parseObject(uploadResultJson); + String src = jsonObject.getString("src"); + String md5 = jsonObject.getString("md5"); + delFileUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5; + uploadFileUrl = jsonObject.getString("url").replaceAll("\\?.*", "");; + Long size = jsonObject.getLong("size"); + String printSize = getPrintSize(size); + fileInfoMap.put("size", printSize); + fileInfoMap.put("url", src); + } catch (Exception e) { + downloadStatusCode = Constants.UPLOAD_FILE_FAIL; + log.info("File upload failed url:{},Exception:",url,e); + continue; + } + filePath.add(uploadFileUrl); + delFileMap.put(uploadFileUrl, delFileUrl); + delFileList.add(delFileMap); + fileMap.put("gofastUrl", uploadFileUrl); + fileMap.put("originalUrl", fileUrl); + forwardUrl.add(fileMap); + filePathSize.add(fileInfoMap); + } + returnMap.put("forwardUrl", forwardUrl); + returnMap.put("filePath", filePath); + returnMap.put("delFileList", delFileList); + returnMap.put("downloadStatusCode", downloadStatusCode); + resultData.put("ErroeMessage", downloadStatusCode); + returnMap.put("filePathSize", filePathSize); + resultData.put("forwardUrl", forwardUrl); + resultData.put("filePath", filePath); + resultData.put("delFileList", delFileList); + resultData.put("filePathSize", filePathSize); + return returnMap; + } + + public Map DownloadAndUploadFileNoUa(List fileList, Map resultData) { + List filePath = new ArrayList<>(); + List delFileList = new ArrayList<>(); + Map returnMap = new HashMap<>(32); + List forwardUrl = new ArrayList<>(); + List filePathSize = new ArrayList<>(); + Integer downloadStatusCode = 0; + for (String url : fileList) { + Map fileInfoMap = new HashMap(32); + Map delFileMap = new HashMap(32); + Map fileMap = new HashMap(32); + if (formatJudgment(url)) { + downloadStatusCode = Constants.URL_FORMAT; + continue; + } + String[] split = url.split("###"); + String fileUrl = split[0]; + String fileType = split[1]; + try { + if (!StringUtil.hasValue(fileUrl)) { + downloadStatusCode = Constants.IMAGE_LIST_NULL; + continue; + } + downLoadFileExeCommandForWGetNoUa(fileUrl, fileType); + } catch (Exception e) { + downloadStatusCode = Constants.DOWNLOAD_FILE_FAIL; + log.error("File download failed url:{},Exception:",url,e); + continue; + } + String delFileUrl = ""; + String uploadFileUrl = ""; + try { + fileInfoMap.put("videoTime", ""); + fileInfoMap.put("resolution", ""); + String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType); + JSONObject jsonObject = JSON.parseObject(uploadResultJson); + String src = jsonObject.getString("src"); + String md5 = jsonObject.getString("md5"); + delFileUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5; + uploadFileUrl = jsonObject.getString("url").replaceAll("\\?.*", "");; + Long size = jsonObject.getLong("size"); + String printSize = getPrintSize(size); + fileInfoMap.put("size", printSize); + fileInfoMap.put("url", src); + } catch (Exception e) { + downloadStatusCode = Constants.UPLOAD_FILE_FAIL; + log.error("File upload failed url:{},Exception:",url, e); + e.printStackTrace(); + continue; + } + filePath.add(uploadFileUrl); + delFileMap.put(uploadFileUrl, delFileUrl); + delFileList.add(delFileMap); + fileMap.put("gofastUrl", uploadFileUrl); + fileMap.put("originalUrl", fileUrl); + forwardUrl.add(fileMap); + filePathSize.add(fileInfoMap); + } + returnMap.put("forwardUrl", forwardUrl); + returnMap.put("filePath", filePath); + returnMap.put("delFileList", delFileList); + returnMap.put("downloadStatusCode", downloadStatusCode); + resultData.put("ErroeMessage", downloadStatusCode); + returnMap.put("filePathSize", filePathSize); + resultData.put("forwardUrl", forwardUrl); + resultData.put("filePath", filePath); + resultData.put("delFileList", delFileList); + resultData.put("filePathSize", filePathSize); + return returnMap; + } +} diff --git a/src/main/java/com/bw/fileDownload/service/FileDownloadService.java b/src/main/java/com/bw/fileDownload/service/FileDownloadService.java new file mode 100644 index 0000000..d12e5f9 --- /dev/null +++ b/src/main/java/com/bw/fileDownload/service/FileDownloadService.java @@ -0,0 +1,30 @@ +package com.bw.fileDownload.service; + +import java.util.Map; + +/** + * 各类型任务执行接口 + * @author jian.mao + * @date 2025年2月7日 + * @description + */ +public interface FileDownloadService { + + /** + * 视频类型任务执行逻辑接口 + * @param task + */ + public void video(Map task); + + /** + * 文档or图片执行逻辑接口 + * @param task + */ + public void docOrImg(Map task); + + /** + * 无附件类型执行逻辑接口 + * @param task + */ + public void notFile(Map task); +} diff --git a/src/main/java/com/bw/fileDownload/service/impl/FileDownloadServiceImpl.java b/src/main/java/com/bw/fileDownload/service/impl/FileDownloadServiceImpl.java new file mode 100644 index 0000000..1bd602a --- /dev/null +++ b/src/main/java/com/bw/fileDownload/service/impl/FileDownloadServiceImpl.java @@ -0,0 +1,160 @@ +package com.bw.fileDownload.service.impl; + +import java.util.List; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import com.alibaba.fastjson.JSONObject; +import com.bw.fileDownload.components.SpringBootKafka; +import com.bw.fileDownload.entity.Constants; +import com.bw.fileDownload.service.DownloadExecService; +import com.bw.fileDownload.service.FileDownloadService; + +import lombok.extern.slf4j.Slf4j; + +/** + * 附件下载逻辑实现类 + * @author jian.mao + * @date 2025年2月6日 + * @description + */ +@Service +@Slf4j +public class FileDownloadServiceImpl implements FileDownloadService { + + @Autowired + private SpringBootKafka springBootKafka; + @Autowired + private DownloadExecService downloadExecService; + @Value("${kafka.producer.topic}") + private String topic; + @Override + public void video(Map task) { + //视频队列的任务不仅仅做视频下载,还需要判断是否进行图片和文档的下载 + Map parseData = (Map) task.get(Constants.PARSEDATA); + Map data = (Map) parseData.get(Constants.DATA); + String cid = (String) data.get(Constants.CID); + String url = (String) data.get(Constants.URL); + //获取视频链接 + List videoList = (List) data.get(Constants.VIDEOLIST); + //视频下载 + Map resultMap = downloadExecService.downloadAndUploadVidoAndAudio(videoList, data); + int downloadStatusCode = (int) resultMap.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("视频下载成功: url={},cid={}",url,cid); + } else { + log.info("视频下载失败: url={},cid={}",url,cid); + } + + + //判断文档和图片,包含进行下载操作 + List fileList = (List) data.get(Constants.FILELIST); + if(fileList != null && fileList.size() > 0) { + //有文档 + Map docRes = downloadExecService.DownloadAndUploadFile(fileList, data); + downloadStatusCode = (int) docRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("文档下载成功: url={},cid={}",url,cid); + } else { + log.warn("文档下载失败,尝试无ua请求下载----"); + docRes = downloadExecService.DownloadAndUploadFileNoUa(fileList, data); + downloadStatusCode = (int) docRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("文档下载成功: url={},cid={}",url,cid); + } else { + log.error("文档下载失败: url={},cid={}",url,cid); + } + } + } + List imgList = (List) data.get(Constants.IMGLIST); + if(imgList != null && imgList.size() > 0) { + //有图片 + Map imgRes = downloadExecService.DownloadAndUploadImg(imgList, data); + downloadStatusCode = (int) imgRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("download task success: url=" + url + ",cid=" + cid); + } else { + log.warn("图片下载失败,切换无ua方式下载---"); + imgRes = downloadExecService.DownloadAndUploadImgNoUa(imgList, data); + downloadStatusCode = (int) imgRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("图片下载成功: url={},cid={}",url,cid); + } else { + log.error("图片下载失败: url={},cid={}",url,cid); + } + } + } + //发送kafka,完成数据流闭环 + try { + springBootKafka.send(topic, JSONObject.toJSONString(task)); + } catch (Exception e) { + log.error("视频类型任务发送失败。task:{},异常:{}",JSONObject.toJSON(task),e); + } + } + @Override + public void docOrImg(Map task) { + //文档or图潘队列的任务 + Map parseData = (Map) task.get(Constants.PARSEDATA); + Map data = (Map) parseData.get(Constants.DATA); + String cid = (String) data.get(Constants.CID); + String url = (String) data.get(Constants.URL); + + + //判断文档和图片,包含进行下载操作 + List fileList = (List) data.get(Constants.FILELIST); + if(fileList != null && fileList.size() > 0) { + //有文档 + Map docRes = downloadExecService.DownloadAndUploadFile(fileList, data); + int downloadStatusCode = (int) docRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("文档下载成功: url={},cid={}",url,cid); + } else { + log.warn("文档下载失败,尝试无ua请求下载----"); + docRes = downloadExecService.DownloadAndUploadFileNoUa(fileList, data); + downloadStatusCode = (int) docRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("文档下载成功: url={},cid={}",url,cid); + } else { + log.error("文档下载失败: url={},cid={}",url,cid); + } + } + } + List imgList = (List) data.get(Constants.IMGLIST); + if(imgList != null && imgList.size() > 0) { + //有图片 + Map imgRes = downloadExecService.DownloadAndUploadImg(imgList, data); + int downloadStatusCode = (int) imgRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("download task success: url=" + url + ",cid=" + cid); + } else { + log.warn("图片下载失败,切换无ua方式下载---"); + imgRes = downloadExecService.DownloadAndUploadImgNoUa(imgList, data); + downloadStatusCode = (int) imgRes.get("downloadStatusCode"); + if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) { + log.info("图片下载成功: url={},cid={}",url,cid); + } else { + log.error("图片下载失败: url={},cid={}",url,cid); + } + } + } + //发送kafka,完成数据流闭环 + try { + springBootKafka.send(topic, JSONObject.toJSONString(task)); + } catch (Exception e) { + log.error("文档or图片类型任务发送失败。task:{},异常:{}",JSONObject.toJSON(task),e); + } + } + @Override + public void notFile(Map task) { + try { + //无附件不进行处理,直接发送kafka + springBootKafka.send(topic, JSONObject.toJSONString(task)); + } catch (Exception e) { + log.error("无附件类型任务发送失败。task:{},异常:{}",JSONObject.toJSON(task),e); + } + } + +} diff --git a/src/main/java/com/bw/fileDownload/utils/FileUtil.java b/src/main/java/com/bw/fileDownload/utils/FileUtil.java new file mode 100644 index 0000000..6609c6b --- /dev/null +++ b/src/main/java/com/bw/fileDownload/utils/FileUtil.java @@ -0,0 +1,30 @@ +package com.bw.fileDownload.utils; + +import java.io.FileWriter; +import java.io.IOException; + +/** + * 文件工具类 + * @author jian.mao + * @date 2023年7月14日 + * @description + */ +public class FileUtil { + + /** + * 数据写入文件 + * @param Path 文件路径 + * @param result 数据 + * @throws IOException + */ + public static void writeFile(String path,String result){ + try { + FileWriter fw = new FileWriter(path,true); + fw.write(result+"\n"); + fw.flush(); + fw.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/bw/fileDownload/utils/StringUtil.java b/src/main/java/com/bw/fileDownload/utils/StringUtil.java new file mode 100644 index 0000000..1efb393 --- /dev/null +++ b/src/main/java/com/bw/fileDownload/utils/StringUtil.java @@ -0,0 +1,122 @@ +package com.bw.fileDownload.utils; + +import java.security.MessageDigest; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author:jinming + * @className:StringUtil + * @version:1.0 + * @description: + * @Date:2023/3/6 14:33 + */ +@Slf4j +public class StringUtil { + public static boolean hasValue(String str) { + return str != null && !"".equals(str.trim()); + } + + public static String getRegexGroup(String regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Pattern p = Pattern.compile(regex); + Matcher m = p.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + } + + return resultStr; + } + + public static String getMd5(String string) { + try { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] bs = md5.digest(string.getBytes("UTF-8")); + StringBuilder sb = new StringBuilder(40); + for (byte x : bs) { + if ((x & 0xff) >> 4 == 0) { + sb.append("0").append(Integer.toHexString(x & 0xff)); + } else { + sb.append(Integer.toHexString(x & 0xff)); + } + } + return sb.toString(); + } catch (Exception e) { + //LOG.error("获取md5异常", e); + return "nceaform" + System.currentTimeMillis(); + } + } + + public static String removeAllHtmlTags(String str) { + return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : ""; + } + + public static String getRegexGroup(Pattern regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Matcher m = regex.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + log.error(regex + " parser error!"); + } + + return resultStr; + } + + public static String getStrByPattern(String str, String regex) { + Pattern pattern = Pattern.compile(regex); + Matcher m = pattern.matcher(str); + return m.find() ? m.group(0) : ""; + } + + public static Set getEmailAddress(String message) { + Set emailList = new HashSet<>(); + Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+"); + Matcher m = pattern.matcher(message); + while (m.find()) { + emailList.add(m.group(0)); + } + return emailList; + } + + public static boolean checkDoubleHttp(String input) { + int count = 0; + int index = 0; + + while (index < input.length()) { + index = input.indexOf("http://", index); + if (index == -1) { + break; + } + count++; + index += 4; + } + if (count >= 2) { + return true; + } + count = 0; + index = 0; + while (index < input.length()) { + index = input.indexOf("https://", index); + if (index == -1) { + break; + } + count++; + index += 4; + } + return count >= 2; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..675ee49 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,103 @@ +logging: + level: + root: info + path: ./logs +server: + port: 8019 + servlet: + context-path: /fileQueueSync + tomcat: + uri-encoding: utf-8 + max-connections: 20000 + max-http-form-post-size: 1 + max-threads: 1000 +spring: + application: + name: 附件补充下载服务 + kafka: + bootstrap-servers: node-01:19092,node-02:19092,node-03:19092 + producer: + retries: 0 + #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 + batch-size: 16384 + # 设置生产者内存缓冲区的大小。 + buffer-memory: 33554432 + # 键的序列化方式 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + # 值的序列化方式 + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + acks: 1 + consumer: + # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D + auto-commit-interval: 1S + # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) + # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 + auto-offset-reset: earliest + # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 + enable-auto-commit: true + # 键的反序列化方式 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + # 值的反序列化方式 + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + #消费组 + group-id: test4 + #消费者并发线程数 + concurrency: 4 + #超时时间 + max-poll-interval-ms: 60000 + listener: + # 在侦听器容器中运行的线程数。 + #concurrency: 5 + #listner负责ack,每调用一次,就立即commit + #ack-mode: manual_immediate + missing-topics-fatal: false + boot: + admin: + client: + url: http://192.168.0.44:8001 + instance: + service-base-url: http://192.168.0.44:8019 + +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: always + health: + elasticsearch: + enabled: false + +kafka: + consumer: + topic: kafka08_news_parse_queue + producer: + topic: kafka08_news_fileSync_queue +#任务队列相关配置 +task: + dispatch-thread-num: 1 + results-thread-num: 1 + save-thread-num: 1 + queue-size: 10000 + video-task-queue-path: ../data/videoTaskQueue.txt + docOrImg-task-queue-path: ../data/docOrImgTaskQueue.txt + notFile-task-queue-path: ../data/notFileTaskQueue.txt +#线程池配置 +threadPool: + corePoolSize: 5 + maximumPoolSize: 20 + keepAliveTime: 60 + queueSize: 100 + +file: + download: + path: ../file + upload: + url: http://10.8.0.10:8081/group1/upload + diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..592d4c0 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,36 @@ + + + + + + + + + true + + ${logging.level} + + + ${logging.path}/fileQueueSyncInfo.log + + + ${logging.path}/fileQueueSyncInfo.log.%d{yyyy-MM-dd} + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + UTF-8 + + + + + + + + diff --git a/src/test/java/com/bw/AppTest.java b/src/test/java/com/bw/AppTest.java new file mode 100644 index 0000000..fc367fc --- /dev/null +++ b/src/test/java/com/bw/AppTest.java @@ -0,0 +1,38 @@ +package com.bw; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/target/.gitignore b/target/.gitignore new file mode 100644 index 0000000..4569837 --- /dev/null +++ b/target/.gitignore @@ -0,0 +1,2 @@ +/classes/ +/test-classes/