Browse Source

附件补充下载服务

master
maojian 5 months ago
commit
952a55339b
  1. 40
      .classpath
  2. 23
      .project
  3. 5
      .settings/org.eclipse.core.resources.prefs
  4. 9
      .settings/org.eclipse.jdt.core.prefs
  5. 4
      .settings/org.eclipse.m2e.core.prefs
  6. 221
      pom.xml
  7. 27
      src/main/java/com/bw/fileDownload/Application.java
  8. 38
      src/main/java/com/bw/fileDownload/cache/ConfigCache.java
  9. 46
      src/main/java/com/bw/fileDownload/components/SpringBootKafka.java
  10. 20
      src/main/java/com/bw/fileDownload/config/KafkaConfig.java
  11. 78
      src/main/java/com/bw/fileDownload/entity/Constants.java
  12. 63
      src/main/java/com/bw/fileDownload/entity/FileInfoPo.java
  13. 244
      src/main/java/com/bw/fileDownload/handler/MainHandler.java
  14. 76
      src/main/java/com/bw/fileDownload/process/TaskDistributionProcess.java
  15. 873
      src/main/java/com/bw/fileDownload/service/DownloadExecService.java
  16. 30
      src/main/java/com/bw/fileDownload/service/FileDownloadService.java
  17. 160
      src/main/java/com/bw/fileDownload/service/impl/FileDownloadServiceImpl.java
  18. 30
      src/main/java/com/bw/fileDownload/utils/FileUtil.java
  19. 122
      src/main/java/com/bw/fileDownload/utils/StringUtil.java
  20. 103
      src/main/resources/application.yml
  21. 36
      src/main/resources/logback-spring.xml
  22. 38
      src/test/java/com/bw/AppTest.java
  23. 2
      target/.gitignore

40
.classpath

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

23
.project

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>FileQueueSync</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

5
.settings/org.eclipse.core.resources.prefs

@ -0,0 +1,5 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8

9
.settings/org.eclipse.jdt.core.prefs

@ -0,0 +1,9 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.methodParameters=generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8

4
.settings/org.eclipse.m2e.core.prefs

@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

221
pom.xml

@ -0,0 +1,221 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
</parent>
<groupId>com.bw</groupId>
<artifactId>FileQueueSync</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>FileQueueSync</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--
https://mvnrepository.com/artifact/de.codecentric/spring-boot-admin-starter-client -->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-test -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.10.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!--
https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-reflect -->
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
<version>1.6.21</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jsoup/jsoup -->
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/p6spy/p6spy -->
<dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
<build>
<!-- <pluginManagement> --><!-- lock down plugins versions to avoid using Maven defaults (may be
moved
to parent pom) -->
<plugins>
<!-- clean lifecycle, see
https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see
https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see
https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- spring-boot-maven-plugin插件就是打包spring boot应用的 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.bw.fileDownload.Application</mainClass>
<layout>ZIP</layout>
<includes>
<include>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<type>jar</type>
<includeTypes>jar</includeTypes>
<includeScope>runtime</includeScope>
<outputDirectory>${project.build.directory}/libs</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!-- </pluginManagement> -->
</build>
</project>

27
src/main/java/com/bw/fileDownload/Application.java

@ -0,0 +1,27 @@
package com.bw.fileDownload;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 主入口
*
* @author jian.mao
* @date 2023年7月4日
* @description
*/
@SpringBootApplication
@EnableScheduling
@EnableKafka
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

38
src/main/java/com/bw/fileDownload/cache/ConfigCache.java

@ -0,0 +1,38 @@
package com.bw.fileDownload.cache;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author jian.mao
* @date 2022年11月11日
* @description 静态变量类
*/
@Slf4j
public class ConfigCache {
/**启动条件**/
public static boolean isStart = true;
/*****视频任务队列*****/
public static LinkedBlockingDeque<Map<String, Object>> videoTaskQueue = null;
/*****文件or图片任务队列*****/
public static LinkedBlockingDeque<Map<String, Object>> fileTaskQueue = null;
/*****无附件队列队列*****/
public static LinkedBlockingDeque<Map<String, Object>> notFileTaskQueue = null;
/**
* 队列录入任务
* @param queue
* @param task
*/
public static void putQueue(LinkedBlockingDeque<Map<String, Object>> queue,Map<String, Object> task){
//next app 写入队列准备调出
try {
queue.put(task);
} catch (InterruptedException e) {
log.error("队列写入data失败---");
}
}
}

46
src/main/java/com/bw/fileDownload/components/SpringBootKafka.java

@ -0,0 +1,46 @@
package com.bw.fileDownload.components;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @PROJECT_NAME: companybusinesscrawl
* @DESCRIPTION:SpringBootKafka 工具类
* @AUTHOR: ying.zhao
* @DATE: 2023/4/6 11:09
*/
@Slf4j
@Component
public class SpringBootKafka {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 自定义topicKafkaTemplate
*/
/**
* public static final String TOPIC = "companyBussTest";
**/
public void send(String topic, String message) {
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的处理
log.info(topic + " - 生产者 发送消息成功" );
}
});
}
}

20
src/main/java/com/bw/fileDownload/config/KafkaConfig.java

@ -0,0 +1,20 @@
package com.bw.fileDownload.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* kafka配置类
* @author jian.mao
* @date 2023年7月6日
* @description
*/
@Configuration
public class KafkaConfig {
@Value("${kafka.consumer.topic}")
private String kafkaTopic;
public String getKafkaTopic() {
return kafkaTopic;
}
}

78
src/main/java/com/bw/fileDownload/entity/Constants.java

@ -0,0 +1,78 @@
package com.bw.fileDownload.entity;
/**
* 常量类
* @author jian.mao
* @date 2025年2月5日
* @description
*/
public class Constants {
/******采集引擎任务体常量key**********/
public final static String TASKDATA = "taskdata";
/******采集引擎下载体常量key**********/
public final static String SPIDERDATA = "spiderdata";
/******采集引擎解析体常量key**********/
public final static String PARSEDATA = "parsedata";
/******采集标签常量key**********/
public final static String PAGE_SWITCHS = "page_switchs";
/******文件下载表示常量key*******/
public static final String DOWNLOADFILE = "downloadFile";
/******视频下载表示常量key*******/
public static final String DOWNLOADVIDEO = "downloadVideo";
/******图片下载表示常量key*******/
public static final String DOWNLOADPIC = "downloadPic";
/******data常量******/
public static final String DATA = "data";
/******文件链接存储key******/
public static final String FILELIST = "fileList";
/******图片链接存储key******/
public static final String IMGLIST = "imgList";
/******视频链接存储key******/
public static final String VIDEOLIST = "videoList";
public static final String CID = "cid";
public static final String URL = "url";
public static String MSGQUEUENAME = "msgqueuename";
public static String VIDEO_PATH = "videoList";
public static String FILE_PATH = "fileList";
public static String IMAGE_PATH = "imgList";
public static Integer NO_DOWNLOAD_URL = 402001;
public static Integer NO_DOWNLOAD_TAG = 402002;
public static Integer DOWNLOAD_VIDEO_FAIL = 402003;
public static Integer DOWNLOAD_IMAGE_FAIL = 402004;
public static Integer DOWNLOAD_FILE_FAIL = 402005;
public static Integer UPLOAD_VIDEO_FAIL = 402006;
public static Integer UPLOAD_IMAGE_FAIL = 402007;
public static Integer UPLOAD_FILE_FAIL = 402008;
public static Integer VIDEO_LIST_NULL = 402009;
public static Integer IMAGE_LIST_NULL = 402010;
public static Integer FILE_LIST_NULL = 402011;
public static Integer REDIS_ERROR = 402012;
public static Integer URL_FORMAT = 402012;
public static Integer DOWNLOAD_STANDARD = 402013;
public static Integer DOWNLOAD_SUCCESS = 0;
public static Integer DOWNLOAD_NOT_IMAGE = 402014;
public static String DOWNLOAD_STATUS_CODE = "downloadStatusCode";
}

63
src/main/java/com/bw/fileDownload/entity/FileInfoPo.java

@ -0,0 +1,63 @@
package com.bw.fileDownload.entity;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author:jinming
* @className:FileInfoPo
* @version:1.0
* @description:
* @Date:2023/2/28 10:13
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FileInfoPo {
/**
* 文件原始Url
*/
private String fileUrl;
/**
* gofast下载url
*/
private String goFastUrl;
/**
* 文件名
*/
private String fileName;
/**
* 文件类型
*/
private String fileType;
/**
* 图片尺寸
*/
private String resolution;
/**
* 文件大小
*/
private String size;
/**
* 视频时长
*/
private String videoTime;
/**
* 文件得MD5值
*/
private String md5;
/**
* 文件的删除地址
*/
private String delUrl;
/**
* 文件的后缀地址
*/
private String src;
public String toJsonString() {
return JSON.toJSONString(this);
}
}

244
src/main/java/com/bw/fileDownload/handler/MainHandler.java

@ -0,0 +1,244 @@
package com.bw.fileDownload.handler;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.bw.fileDownload.cache.ConfigCache;
import com.bw.fileDownload.service.FileDownloadService;
import com.bw.fileDownload.utils.FileUtil;
import lombok.extern.slf4j.Slf4j;
/**
* 执行入口
* @author jian.mao
* @date 2025年2月5日
* @description
*/
@Component
@Slf4j
public class MainHandler implements ApplicationRunner{
@Autowired
private FileDownloadService fileDownloadService;
/***线程池参数***/
@Value("${threadPool.corePoolSize}")
private int corePoolSize;
@Value("${threadPool.maximumPoolSize}")
private int maximumPoolSize;
@Value("${threadPool.keepAliveTime}")
private long keepAliveTime;
@Value("${threadPool.queueSize}")
private int threadQueueSize;
@Value("${task.queue-size}")
private Integer taskQueueSize;
@Value("${task.video-task-queue-path}")
private String videoTaskPath;
@Value("${task.docOrImg-task-queue-path}")
private String docOrImgTaskPath;
@Value("${task.notFile-task-queue-path}")
private String notFileTaskPath;
@Override
public void run(ApplicationArguments args) throws Exception {
//设置队列长度
if(taskQueueSize == null || taskQueueSize == 0){
taskQueueSize = 10000;
}
/*****视频任务队列*****/
ConfigCache.videoTaskQueue = new LinkedBlockingDeque<Map<String,Object>>(taskQueueSize);
/*****文档or图片任务队列*****/
ConfigCache.fileTaskQueue = new LinkedBlockingDeque<Map<String,Object>>(taskQueueSize);
/*****无附件任务队列*****/
ConfigCache.notFileTaskQueue = new LinkedBlockingDeque<Map<String,Object>>(taskQueueSize);
//视频线程池
ThreadPoolExecutor videoExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(threadQueueSize),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//视频任务执行
Thread videoConsumer = new Thread(() -> {
while(ConfigCache.isStart) {
try {
Map<String, Object> task = ConfigCache.videoTaskQueue.take();
videoExecutor.execute(()-> videoDownloadExec(task));
} catch (Exception e) {
log.error("视频任务执行异常------e:",e);
}
}
});
videoConsumer.start();
log.info("视频任务执行线程启动------");
//文件or图片线程池
ThreadPoolExecutor docOrImgExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(threadQueueSize),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//文件or图片任务执行
Thread docOrImgConsumer = new Thread(() -> {
while(ConfigCache.isStart) {
try {
Map<String, Object> task = ConfigCache.fileTaskQueue.take();
docOrImgExecutor.execute(()-> docOrImgDownloadExec(task));
} catch (Exception e) {
// TODO: handle exception
log.error("文件or图片任务执行异常------e:",e);
}
}
});
docOrImgConsumer.start();
log.info("文件图片任务执行线程启动-----");
//无附件线程池
ThreadPoolExecutor notFileExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(threadQueueSize),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//无附件任务执行
Thread notFileConsumer = new Thread(() -> {
while(ConfigCache.isStart) {
try {
Map<String, Object> task = ConfigCache.notFileTaskQueue.take();
notFileExecutor.execute(()-> notFileDownloadExec(task));
} catch (Exception e) {
// TODO: handle exception
log.error("无附件任务执行异常------e:",e);
}
}
});
notFileConsumer.start();
log.info("无附件任务执行线程启动-----");
//启动加载缓存任务
readTask(videoTaskPath, ConfigCache.videoTaskQueue);
readTask(docOrImgTaskPath, ConfigCache.fileTaskQueue);
readTask(notFileTaskPath, ConfigCache.notFileTaskQueue);
//停止处理
waitDown();
}
private void videoDownloadExec(Map<String, Object> task) {
fileDownloadService.video(task);
}
private void docOrImgDownloadExec(Map<String, Object> task) {
fileDownloadService.docOrImg(task);
}
private void notFileDownloadExec(Map<String, Object> task) {
fileDownloadService.notFile(task);
}
/****************************************************************load******************************************************************************/
/**
* 加载文件中的任务
* @param path 文件地址
* @param queue 队列
*/
@SuppressWarnings("unchecked")
public static void readTask(String path, LinkedBlockingDeque<Map<String, Object>> queue) {
File file = new File(path);
if (file.exists()) {
List<String> tasks = null;
try {
tasks = FileUtils.readLines(file, "UTF-8");
} catch (IOException e) {
e.printStackTrace();
}
for (String taskStr : tasks) {
Map<String, Object> task = JSONObject.parseObject(taskStr);
try {
queue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
file.delete();
}
}
/*******************************************************************stop************************************************************************/
/**
* 结束触发钩子
*/
public void waitDown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// 停止线程
ConfigCache.isStart = false;
log.info("stop-------");
writeTsskToFile();
}
});
}
/**
* 任务持久化到硬盘
*/
public void writeTsskToFile() {
while (true) {
if (ConfigCache.videoTaskQueue.size() > 0) {
try {
Map<String, Object> task = ConfigCache.videoTaskQueue.take();
FileUtil.writeFile(videoTaskPath ,JSONObject.toJSONString(task));
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
log.info("taskQueue write is file end");
break;
}
}
while (true) {
if (ConfigCache.fileTaskQueue.size() > 0) {
try {
Map<String, Object> task = ConfigCache.fileTaskQueue.take();
FileUtil.writeFile(docOrImgTaskPath, JSONObject.toJSONString(task));
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
log.info("taskQueue write is file end");
break;
}
}
while (true) {
if (ConfigCache.notFileTaskQueue.size() > 0) {
try {
Map<String, Object> task = ConfigCache.notFileTaskQueue.take();
FileUtil.writeFile(notFileTaskPath, JSONObject.toJSONString(task));
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
log.info("taskQueue write is file end");
break;
}
}
}
}

76
src/main/java/com/bw/fileDownload/process/TaskDistributionProcess.java

@ -0,0 +1,76 @@
package com.bw.fileDownload.process;
import java.util.List;
import java.util.Map;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.bw.fileDownload.cache.ConfigCache;
import com.bw.fileDownload.entity.Constants;
import lombok.extern.slf4j.Slf4j;
/**
* 任务读取并分配
* @author jian.mao
* @date 2025年2月5日
* @description
*/
@Component
@Slf4j
public class TaskDistributionProcess {
@KafkaListener(topics = "#{kafkaConfig.getKafkaTopic()}")
public void consumeMessage(String message) {
Map<String, Object> result = JSONObject.parseObject(message);
taskSplit(result);
}
/**
* 任务拆分文件or图片视频无附件
* @param task
*/
@SuppressWarnings("unchecked")
private void taskSplit(Map<String, Object> task) {
//先判断下载标签
Map<String, Object> taskdata = (Map<String, Object>) task.get(Constants.TASKDATA);
Map<String, Object> pageSwitchs = (Map<String, Object>) taskdata.get(Constants.PAGE_SWITCHS);
Map<String, Object> parsedata = (Map<String, Object>) task.get(Constants.PARSEDATA);
//解析后的数据体
Map<String, Object> data = (Map<String, Object>) parsedata.get(Constants.DATA);
//判断视频标识
if(pageSwitchs.containsKey(Constants.DOWNLOADVIDEO) && (boolean)pageSwitchs.get(Constants.DOWNLOADVIDEO)) {
//获取所需下载文件链接
List<String> videList = (List<String>) data.get(Constants.VIDEOLIST);
if(videList != null && videList.size() > 0) {
//有视频链接
ConfigCache.putQueue(ConfigCache.videoTaskQueue, task);
return;
}
}
//判断图片标识
if(pageSwitchs.containsKey(Constants.DOWNLOADPIC) && (boolean)pageSwitchs.get(Constants.DOWNLOADPIC)) {
//获取所需下载文件链接
List<String> imgList = (List<String>) data.get(Constants.IMGLIST);
if(imgList != null && imgList.size() > 0) {
//有图片链接
ConfigCache.putQueue(ConfigCache.fileTaskQueue, task);
return;
}
}
//判断文件标识
if(pageSwitchs.containsKey(Constants.DOWNLOADFILE) && (boolean)pageSwitchs.get(Constants.DOWNLOADFILE)) {
//获取所需下载文件链接
List<String> fileList = (List<String>) data.get(Constants.FILELIST);
if(fileList != null && fileList.size() > 0) {
//有文件链接
ConfigCache.putQueue(ConfigCache.fileTaskQueue, task);
return;
}
}
//即没视频又没文件
ConfigCache.putQueue(ConfigCache.notFileTaskQueue, task);
}
}

873
src/main/java/com/bw/fileDownload/service/DownloadExecService.java

@ -0,0 +1,873 @@
package com.bw.fileDownload.service;
import java.awt.image.BufferedImage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.fileDownload.entity.Constants;
import com.bw.fileDownload.entity.FileInfoPo;
import com.bw.fileDownload.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
@Component
@Slf4j
public class DownloadExecService {
@Value("${file.download.path}")
private String downloadFilePath;
@Value("${file.upload.url}")
private String gofastUrl;
private static final String IMAGE_TYPE_JPG = "jpg";
private static final String IMAGE_TYPE_JPEG = "jpeg";
private static final String IMAGE_TYPE_PNG = "png";
private static OkHttpClient client = new OkHttpClient();
public Map downloadAndUploadVidoAndAudio(List<String> videoList, Map<String, Object> resultData) {
String header = "header";
Map<String, String> hederMap = new HashMap<>();
List<String> videoPath = new ArrayList<>();
List<Map> delVideoAndAudioList = new ArrayList<>();
Map<String, Object> returnMap = new HashMap<>(32);
List<Map> videoUrl = new ArrayList<>();
List<Map> videoPathSize = new ArrayList<>();
Integer downloadStatusCode = 0;
for (String url : videoList) {
Map delVideoAndAudioMap = new HashMap(32);
Map videoMap = new HashMap(32);
Map videoPathMap = new HashMap(32);
if (formatJudgment(url)) {
downloadStatusCode = Constants.URL_FORMAT;
continue;
}
String[] split = url.split("###");
String videoType = split[1];
String fileType = videoType;
String m3u8Type = ".m3u8";
String youtubeTypeFr = "youtube";
String youtubeTypeSec = "youtu.be";
String needDownloadVideoUrl = split[0];
try {
if (!StringUtil.hasValue(needDownloadVideoUrl)) {
downloadStatusCode = Constants.VIDEO_LIST_NULL;
continue;
}
if (m3u8Type.equals(fileType)) {
downLoadVideoExeCommandForFfmpeg(needDownloadVideoUrl);
} else if (url.contains(youtubeTypeFr) || url.contains(youtubeTypeSec)) {
downLoadVideoExeCommandLp(needDownloadVideoUrl, videoType);
} else {
File file = new File(downloadFilePath + getMd5(needDownloadVideoUrl).concat(fileType));
if (resultData.containsKey(header)) {
hederMap = (Map<String, String>) resultData.get(header);
downloadFile(needDownloadVideoUrl, downloadFilePath, getMd5(needDownloadVideoUrl).concat(fileType), hederMap);
} else {
downloadFile(needDownloadVideoUrl, downloadFilePath, getMd5(needDownloadVideoUrl).concat(fileType));
if (!file.exists() || file.length() < 100) {
downLoadFileExeCommandForWGet(needDownloadVideoUrl, fileType);
}
}
}
} catch (Exception e) {
downloadStatusCode = Constants.DOWNLOAD_VIDEO_FAIL;
log.error("Video or audio download failed url:{},Exception:",url,e);
continue;
}
String delVideoAndAudioUrl = "";
String uploadFileUrl = "";
try {
Map<String, String> videoTimeMap = getVideoTime(downloadFilePath + getMd5(needDownloadVideoUrl) + videoType);
String videoTime = videoTimeMap.get("videoTime");
String resolution = videoTimeMap.get("resolution");
videoPathMap.put("videoTime", videoTime);
videoPathMap.put("resolution", resolution);
String uploadResultJson = "";
if (m3u8Type.equals(fileType)) {
uploadResultJson = upLoadFile(downloadFilePath + getMd5(needDownloadVideoUrl) + ".mp4");
} else {
uploadResultJson = upLoadFile(downloadFilePath + getMd5(needDownloadVideoUrl) + videoType);
}
JSONObject jsonObject = JSON.parseObject(uploadResultJson);
String md5 = jsonObject.getString("md5");
delVideoAndAudioUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5;
uploadFileUrl = jsonObject.getString("url").replaceAll("\\?.*", "");
String src = jsonObject.getString("src");
Long size = jsonObject.getLong("size");
String printSize = getPrintSize(size);
videoPathMap.put("size", printSize);
videoPathMap.put("url", src);
} catch (Exception e) {
downloadStatusCode = Constants.UPLOAD_VIDEO_FAIL;
log.error("Video upload failed url:{},Exception:",url,e);
continue;
}
videoPath.add(uploadFileUrl);
delVideoAndAudioMap.put(uploadFileUrl, delVideoAndAudioUrl);
delVideoAndAudioList.add(delVideoAndAudioMap);
videoMap.put("originalUrl", needDownloadVideoUrl);
videoMap.put("gofastUrl", uploadFileUrl);
videoUrl.add(videoMap);
videoPathSize.add(videoPathMap);
}
returnMap.put("videoUrl", videoUrl);
returnMap.put("videoPath", videoPath);
returnMap.put("delVideoAndAudioList", delVideoAndAudioList);
returnMap.put("downloadStatusCode", downloadStatusCode);
resultData.put("ErroeMessage", downloadStatusCode);
returnMap.put("videoPathSize", videoPathSize);
resultData.put("videoUrl", videoUrl);
resultData.put("videoPath", videoPath);
resultData.put("delVideoAndAudioList", delVideoAndAudioList);
resultData.put("videoPathSize", videoPathSize);
return returnMap;
}
private boolean formatJudgment(String url) {
String formatTag = "###";
if (!url.contains(formatTag)) {
return true;
}
return false;
}
private void downLoadVideoExeCommandForFfmpeg(String videoUrl) {
String news_id = getMd5(videoUrl);
String command = "ffmpeg -i " + videoUrl + " " + downloadFilePath
+ news_id + ".mp4";
InputStream in = null;
BufferedReader read = null;
String Qavg = "Qavg:";
try {
System.out.println("DownloadUtil" + command);
Process process;
try {
String line = "";
File file = new File(downloadFilePath);
if (!file.exists()) {
file.mkdirs();
}
process = Runtime.getRuntime().exec(command);
in = process.getErrorStream();
read = new BufferedReader(new InputStreamReader(in));
while ((line = read.readLine()) != null) {
System.out.println("DownloadUtil" + line);
if (line.contains(Qavg)) {
break;
}
}
process.destroy();
} catch (IOException var26) {
var26.printStackTrace();
}
} finally {
try {
in.close();
read.close();
} catch (IOException var25) {
var25.printStackTrace();
}
}
}
private void downLoadVideoExeCommandLp(String videoUrl, String videoType) {
String command = "";
List<String> stringList = doGetVideoIdCommand(videoUrl);
for (String s : stringList) {
String videoId = s.split(" ")[0];
command = "yt_dlp --no-check-certificate -f " + videoId + " -o " + downloadFilePath + getMd5(videoUrl) + videoType + " " + videoUrl;
log.info("视频下载命令:" + command);
break;
}
InputStream in = null;
BufferedReader read = null;
try {
Process process;
try {
String line = "";
File file = new File(downloadFilePath);
if (!file.exists()) {
file.mkdirs();
}
process = Runtime.getRuntime().exec(command, (String[]) null, file);
in = process.getInputStream();
read = new BufferedReader(new InputStreamReader(in));
while ((line = read.readLine()) != null) {
System.out.println(line);
}
process.destroy();
} catch (IOException var26) {
var26.printStackTrace();
}
} finally {
try {
in.close();
read.close();
} catch (IOException var25) {
var25.printStackTrace();
}
}
}
private void downLoadFileExeCommandForWGet(String fileUrl, String filetype) {
String news_id = getMd5(fileUrl);
String htpps = "https";
String command = "wget -d --user-agent=\"Mozilla/5.0 (Windows NT xy; rv:10.0) Gecko/20100101 Firefox/10.0\" " + fileUrl + " -O " + downloadFilePath + news_id + filetype;
if (!fileUrl.contains(htpps)) {
command = "wget --no-check-certificate -d --user-agent=\"Mozilla/5.0 (Windows NT xy; rv:10.0) Gecko/20100101 Firefox/10.0\" " + fileUrl + " -O " + downloadFilePath + news_id + filetype;
}
InputStream in = null;
BufferedReader read = null;
try {
log.info("wget 方法进入:" + command);
Process process;
try {
String line = "";
process = Runtime.getRuntime().exec(command);
log.info("执行命令");
log.info("视频下载命令:" + command);
in = process.getErrorStream();
read = new BufferedReader(new InputStreamReader(in));
while ((line = read.readLine()) != null) {
log.info("视频下载进度:" + line);
}
process.destroy();
} catch (IOException var26) {
var26.printStackTrace();
log.info("下载视频异常", var26);
}
} finally {
try {
in.close();
read.close();
} catch (IOException var25) {
var25.printStackTrace();
}
}
}
public String getMd5(String string) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] bs = md5.digest(string.getBytes("UTF-8"));
StringBuilder sb = new StringBuilder(40);
for (byte x : bs) {
if ((x & 0xff) >> 4 == 0) {
sb.append("0").append(Integer.toHexString(x & 0xff));
} else {
sb.append(Integer.toHexString(x & 0xff));
}
}
return sb.toString();
} catch (Exception e) {
log.info("get md 5 exception:", e);
return "nceaform" + System.currentTimeMillis();
}
}
private List<String> doGetVideoIdCommand(String videoUrl) {
List resultList = new ArrayList();
String command = "yt_dlp --no-check-certificate -F " + videoUrl;
InputStream in = null;
BufferedReader read = null;
try {
Process process;
try {
String line = "";
process = Runtime.getRuntime().exec(command, (String[]) null);
in = process.getInputStream();
read = new BufferedReader(new InputStreamReader(in, "gbk"));
while ((line = read.readLine()) != null) {
System.out.println(line);
if (line.contains("mp4")) {
if (line.contains("256x144") && !line.contains("video only")) {
resultList.add(line);
break;
} else if (line.contains("640x360") && !line.contains("video only")) {
resultList.add(line);
break;
} else if (line.contains("854x480") && !line.contains("video only")) {
resultList.add(line);
break;
} else if (line.contains("1280x720") && !line.contains("video only")) {
resultList.add(line);
break;
}
}
}
process.destroy();
} catch (IOException var26) {
var26.printStackTrace();
}
} finally {
try {
in.close();
read.close();
} catch (IOException var25) {
var25.printStackTrace();
}
}
return resultList;
}
public void downloadFile(String fileUrl, String savePath, String customFileName, Map<String, String>... headers) throws IOException {
okhttp3.Request.Builder builder = new okhttp3.Request.Builder();
if (headers != null && headers.length > 0) {
Map<String, String> tempHeaders = headers[0];
Iterator<String> iterator = tempHeaders.keySet().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
builder.addHeader(key, tempHeaders.get(key));
}
} else {
builder.addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7").addHeader("Accept-Language", "zh-CN,zh;q=0.9").addHeader("Cache-Control", "no-cache").addHeader("Connection", "keep-alive").addHeader("Pragma", "no-cache").addHeader("Sec-Fetch-Dest", "document").addHeader("Sec-Fetch-Mode", "navigate").addHeader("Sec-Fetch-Site", "none").addHeader("Sec-Fetch-User", "?1").addHeader("Upgrade-Insecure-Requests", "1").addHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36").addHeader("sec-ch-ua", "\"Google Chrome\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"").addHeader("sec-ch-ua-mobile", "?0").addHeader("sec-ch-ua-platform", "\"Windows\"");
}
Request request = builder.url(fileUrl).get().build();
Call call = client.newCall(request);
Response response = call.execute();
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
ResponseBody responseBody = response.body();
if (responseBody != null) {
InputStream inputStream = responseBody.byteStream();
File fileDirectory = new File(savePath);
if (!fileDirectory.exists()) {
fileDirectory.mkdirs();
}
File file = new File(fileDirectory, customFileName);
FileOutputStream outputStream = new FileOutputStream(file);
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
outputStream.flush();
outputStream.close();
inputStream.close();
}
}
private Map<String, String> getVideoTime(String video_path) {
Map infoMap = new HashMap(32);
String videoTime = "";
String resolution = "";
String command = "ffmpeg -i " + video_path;
InputStream in = null;
BufferedReader read = null;
try {
String info = "";
String line = "";
System.out.println("视频时间解析命令" + command);
Process process;
process = Runtime.getRuntime().exec(command);
in = process.getErrorStream();
read = new BufferedReader(new InputStreamReader(in));
while ((line = read.readLine()) != null) {
info = info + line;
}
process.destroy();
String regexDuration = "Duration: (.*?), start: (.*?), bitrate: (\\d*) kb\\/s";
Pattern pattern = Pattern.compile(regexDuration);
Matcher m = pattern.matcher(info);
if (m.find()) {
videoTime = getTimelen(m.group(1)) + "s";
System.out.println("视频时长:" + videoTime + "s , 开始时间:" + m.group(2) + ", 比特率:" + m.group(3) + "kb/s");
}
String regexVideo = "Video: (.*?), (.*?), (\\d+x\\d+)[,\\s]";
pattern = Pattern.compile(regexVideo);
m = pattern.matcher(info);
if (m.find()) {
resolution = m.group(3);
System.out.println("编码格式:" + m.group(1) + ", 视频格式:" + m.group(2) + ", 分辨率:" + resolution + "kb/s");
}
} catch (Exception e) {
e.printStackTrace();
}
infoMap.put("videoTime", videoTime);
infoMap.put("resolution", resolution);
return infoMap;
}
private String upLoadFile(String filePath) {
File file = new File(filePath);
String realFilename = filePath.substring(filePath.lastIndexOf(File.separator) + 1);
MultipartBody.Builder builder = new MultipartBody.Builder().setType(MultipartBody.FORM);
builder.addPart(Headers.of("Content-Disposition", "form-data; name=\"file\";filename=\"" + realFilename + "\""),
RequestBody.create(MediaType.parse("image/png"), file)
).addFormDataPart("output", "json").build();
RequestBody body = builder.build();
Request request = new Request.Builder().url(gofastUrl).post(body).header("Expect", "100-continue").build();
OkHttpClient.Builder okBuilder = new OkHttpClient.Builder();
OkHttpClient client = okBuilder.connectTimeout(600, TimeUnit.MILLISECONDS)
.readTimeout(600, TimeUnit.SECONDS).build();
Call call = client.newCall(request);
String html = "";
Response response = null;
try {
response = call.execute();
html = response.body().string();
System.out.println("DownloadUtil" + html);
} catch (IOException e) {
log.info("upload fail:" + filePath);
e.printStackTrace();
} finally {
response.close();
}
file.delete();
return html;
}
public String getPrintSize(long size) {
BigDecimal bigDecimal = new BigDecimal(size);
BigDecimal bigDecimal1 = new BigDecimal(1024);
BigDecimal divide = bigDecimal.divide(bigDecimal1, 2, BigDecimal.ROUND_HALF_UP);
return divide.toString() + "KB";
}
private String getTimelen(String timelen) {
int min = 0;
String index = "0";
int two = 2;
String[] strs = timelen.split(":");
if (strs[0].compareTo(index) > 0) {
min += Integer.valueOf(strs[0]) * 60 * 60;
}
if (strs[1].compareTo(index) > 0) {
min += Integer.valueOf(strs[1]) * 60;
}
if (strs[two].compareTo(index) > 0) {
min += Math.round(Float.valueOf(strs[2]));
}
return String.valueOf(min);
}
public Map DownloadAndUploadImg(List<String> imgList, Map<String, Object> resultData) {
String header = "header";
Map<String, String> hederMap = new HashMap<>();
String cid = (String) resultData.get("cid");
List<String> imgPath = new ArrayList<>();
List<Map> delimgList = new ArrayList<>();
Map<String, Object> returnMap = new HashMap<>(32);
List<Map> imagePathSize = new ArrayList<>();
Map contentimgs = new HashMap<>(32);
int imgTag = 1;
Integer downloadStatusCode = 0;
for (String url : imgList) {
if (StringUtil.checkDoubleHttp(url)) {
log.info("Download url have many http:" + url + ",cid:" + cid);
continue;
}
Map delFileMap = new HashMap(32);
Map imgMap = new HashMap(32);
Map imagePathMap = new HashMap(32);
if (formatJudgment(url)) {
downloadStatusCode = Constants.URL_FORMAT;
log.info("Download url splicing error url:" + url + ",cid:" + cid);
continue;
}
String[] split = url.split("###");
String fileUrl = split[0];
String fileType = split[1];
try {
if (!StringUtil.hasValue(fileUrl)) {
downloadStatusCode = Constants.IMAGE_LIST_NULL;
continue;
}
File file = new File(downloadFilePath + getMd5(fileUrl).concat(fileType));
if (resultData.containsKey(header)) {
hederMap = (Map<String, String>) resultData.get(header);
downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType), hederMap);
} else {
downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType));
if (!file.exists() || file.length() < 100) {
downLoadFileExeCommandForWGet(fileUrl, fileType);
}
}
} catch (Exception e) {
downloadStatusCode = Constants.DOWNLOAD_IMAGE_FAIL;
log.info("Image download failed url:{},Exception:",url, e);
e.printStackTrace();
continue;
}
String delimgUrl = "";
String uploadimgUrl = "";
try {
String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType);
JSONObject jsonObject = JSON.parseObject(uploadResultJson);
String md5 = jsonObject.getString("md5");
delimgUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5;
uploadimgUrl = jsonObject.getString("url").replaceAll("\\?.*", "");;
Long size = jsonObject.getLong("size");
imagePathMap.put("videoTime", "");
try {
String imgWidthAndHeight = getImgWidthAndHeight(downloadFilePath + getMd5(fileUrl) + fileType);
String printSize = getPrintSize(size);
String src = jsonObject.getString("src");
imagePathMap.put("resolution", imgWidthAndHeight);
imagePathMap.put("size", printSize);
imagePathMap.put("url", src);
} catch (Exception e) {
String printSize = getPrintSize(size);
String src = jsonObject.getString("src");
imagePathMap.put("resolution", "100x100");
imagePathMap.put("size", printSize);
imagePathMap.put("url", src);
log.error("Image size calculation failed url:{},Exception:",url, e);
}
} catch (Exception e) {
downloadStatusCode = Constants.UPLOAD_IMAGE_FAIL;
log.error("Image upload failed url:{},Exception:",url, e);
continue;
}
imgPath.add(uploadimgUrl);
delFileMap.put(uploadimgUrl, delimgUrl);
delimgList.add(delFileMap);
imgMap.put("img", fileUrl);
imgMap.put("rawimg", fileUrl);
imgMap.put("imgtag", "img" + imgTag);
imgMap.put("uploadImg", uploadimgUrl);
contentimgs.put("img" + imgTag, imgMap);
imagePathSize.add(imagePathMap);
imgTag++;
}
returnMap.put("contentimgs", contentimgs);
returnMap.put("imagePath", imgPath);
returnMap.put("delimgList", delimgList);
returnMap.put("downloadStatusCode", downloadStatusCode);
resultData.put("ErroeMessage", downloadStatusCode);
returnMap.put("imagePathSize", imagePathSize);
resultData.put("contentimgs", contentimgs);
resultData.put("imagePath", imgPath);
resultData.put("delimgList", delimgList);
resultData.put("imagePathSize", imagePathSize);
return returnMap;
}
public Map DownloadAndUploadImgNoUa(List<String> imgList, Map<String, Object> resultData) {
String cid = (String) resultData.get("cid");
List<String> imgPath = new ArrayList<>();
List<Map> delimgList = new ArrayList<>();
Map<String, Object> returnMap = new HashMap<>(32);
List<Map> imagePathSize = new ArrayList<>();
Map contentimgs = new HashMap<>(32);
int imgTag = 1;
Integer downloadStatusCode = 0;
for (String url : imgList) {
if (StringUtil.checkDoubleHttp(url)) {
log.info("Download url have many http:" + url + ",cid:" + cid);
continue;
}
Map delFileMap = new HashMap(32);
Map imgMap = new HashMap(32);
Map imagePathMap = new HashMap(32);
if (formatJudgment(url)) {
downloadStatusCode = Constants.URL_FORMAT;
log.info("Download url splicing error url:" + url + ",cid:" + cid);
continue;
}
String[] split = url.split("###");
String fileUrl = split[0];
String fileType = split[1];
try {
if (!StringUtil.hasValue(fileUrl)) {
downloadStatusCode = Constants.IMAGE_LIST_NULL;
continue;
}
downLoadFileExeCommandForWGetNoUa(fileUrl, fileType);
} catch (Exception e) {
downloadStatusCode = Constants.DOWNLOAD_IMAGE_FAIL;
log.info("Image download failed url:{},Exception:",url, e);
continue;
}
String delimgUrl = "";
String uploadimgUrl = "";
try {
String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType);
JSONObject jsonObject = JSON.parseObject(uploadResultJson);
String md5 = jsonObject.getString("md5");
delimgUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5;
uploadimgUrl = jsonObject.getString("url").replaceAll("\\?.*", "");;
Long size = jsonObject.getLong("size");
imagePathMap.put("videoTime", "");
try {
String imgWidthAndHeight = getImgWidthAndHeight(downloadFilePath + getMd5(fileUrl) + fileType);
String printSize = getPrintSize(size);
String src = jsonObject.getString("src");
imagePathMap.put("resolution", imgWidthAndHeight);
imagePathMap.put("size", printSize);
imagePathMap.put("url", src);
} catch (Exception e) {
String printSize = getPrintSize(size);
String src = jsonObject.getString("src");
imagePathMap.put("resolution", "100x100");
imagePathMap.put("size", printSize);
imagePathMap.put("url", src);
log.info("Image size calculation failed url:" + url + ",Exception:" + e);
}
} catch (Exception e) {
downloadStatusCode = Constants.UPLOAD_IMAGE_FAIL;
log.error("Image upload failed url:{},Exception:",url, e);
continue;
}
imgPath.add(uploadimgUrl);
delFileMap.put(uploadimgUrl, delimgUrl);
delimgList.add(delFileMap);
imgMap.put("img", fileUrl);
imgMap.put("rawimg", fileUrl);
imgMap.put("imgtag", "img" + imgTag);
imgMap.put("uploadImg", uploadimgUrl);
contentimgs.put("img" + imgTag, imgMap);
imagePathSize.add(imagePathMap);
imgTag++;
}
returnMap.put("contentimgs", contentimgs);
returnMap.put("imagePath", imgPath);
returnMap.put("delimgList", delimgList);
returnMap.put("downloadStatusCode", downloadStatusCode);
resultData.put("ErroeMessage", downloadStatusCode);
returnMap.put("imagePathSize", imagePathSize);
resultData.put("contentimgs", contentimgs);
resultData.put("imagePath", imgPath);
resultData.put("delimgList", delimgList);
resultData.put("imagePathSize", imagePathSize);
return returnMap;
}
public String getImgWidthAndHeight(String filePath) {
File file = new File(filePath);
InputStream is = null;
BufferedImage src = null;
int width = -1;
int height = -1;
try {
is = new FileInputStream(file);
src = javax.imageio.ImageIO.read(is);
width = src.getWidth(null);
height = src.getHeight(null);
is.close();
} catch (IOException e) {
}
return width + "x" + height;
}
private void downLoadFileExeCommandForWGetNoUa(String fileUrl, String filetype) {
String news_id = getMd5(fileUrl);
//wget --no-check-certificate -d --user-agent="Mozilla/5.0 (Windows NT xy; rv:10.0) Gecko/20100101 Firefox/10.0" http://gxj.gz.gov.cn/attachment/7/7194/7194190/8648699.pdf -O 1f07794340006d986a7d87684d3ade03.pdf
String command = "wget --no-check-certificate " + fileUrl + " -O " + downloadFilePath + news_id + filetype;
InputStream in = null;
BufferedReader read = null;
try {
log.info("wget 方法进入:" + command);
Process process;
try {
String line = "";
process = Runtime.getRuntime().exec(command);
log.info("执行命令");
log.info("视频下载命令:" + command);
in = process.getErrorStream();
read = new BufferedReader(new InputStreamReader(in));
while ((line = read.readLine()) != null) {
log.info("视频下载进度:" + line);
}
process.destroy();
} catch (IOException var26) {
var26.printStackTrace();
log.info("下载视频异常", var26);
}
} finally {
try {
in.close();
read.close();
} catch (IOException var25) {
var25.printStackTrace();
}
}
}
public Map DownloadAndUploadFile(List<String> fileList, Map<String, Object> resultData) {
String header = "header";
Map<String, String> hederMap = new HashMap<>();
List<String> filePath = new ArrayList<>();
List<Map> delFileList = new ArrayList<>();
Map<String, Object> returnMap = new HashMap<>(32);
List<Map> forwardUrl = new ArrayList<>();
List<Map> filePathSize = new ArrayList<>();
Integer downloadStatusCode = 0;
for (String url : fileList) {
Map fileInfoMap = new HashMap(32);
Map delFileMap = new HashMap(32);
Map fileMap = new HashMap(32);
if (formatJudgment(url)) {
downloadStatusCode = Constants.URL_FORMAT;
continue;
}
String[] split = url.split("###");
String fileUrl = split[0];
String fileType = split[1];
try {
if (!StringUtil.hasValue(fileUrl)) {
downloadStatusCode = Constants.IMAGE_LIST_NULL;
continue;
}
File file = new File(downloadFilePath + getMd5(fileUrl).concat(fileType));
if (resultData.containsKey(header)) {
hederMap = (Map<String, String>) resultData.get(header);
downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType), hederMap);
} else {
downloadFile(fileUrl, downloadFilePath, getMd5(fileUrl).concat(fileType));
if (!file.exists() || file.length() < 100) {
downLoadFileExeCommandForWGet(fileUrl, fileType);
}
}
} catch (Exception e) {
downloadStatusCode = Constants.DOWNLOAD_FILE_FAIL;
log.error("File download failed url:{},Exception:",url,e);
continue;
}
String delFileUrl = "";
String uploadFileUrl = "";
try {
fileInfoMap.put("videoTime", "");
fileInfoMap.put("resolution", "");
String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType);
JSONObject jsonObject = JSON.parseObject(uploadResultJson);
String src = jsonObject.getString("src");
String md5 = jsonObject.getString("md5");
delFileUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5;
uploadFileUrl = jsonObject.getString("url").replaceAll("\\?.*", "");;
Long size = jsonObject.getLong("size");
String printSize = getPrintSize(size);
fileInfoMap.put("size", printSize);
fileInfoMap.put("url", src);
} catch (Exception e) {
downloadStatusCode = Constants.UPLOAD_FILE_FAIL;
log.info("File upload failed url:{},Exception:",url,e);
continue;
}
filePath.add(uploadFileUrl);
delFileMap.put(uploadFileUrl, delFileUrl);
delFileList.add(delFileMap);
fileMap.put("gofastUrl", uploadFileUrl);
fileMap.put("originalUrl", fileUrl);
forwardUrl.add(fileMap);
filePathSize.add(fileInfoMap);
}
returnMap.put("forwardUrl", forwardUrl);
returnMap.put("filePath", filePath);
returnMap.put("delFileList", delFileList);
returnMap.put("downloadStatusCode", downloadStatusCode);
resultData.put("ErroeMessage", downloadStatusCode);
returnMap.put("filePathSize", filePathSize);
resultData.put("forwardUrl", forwardUrl);
resultData.put("filePath", filePath);
resultData.put("delFileList", delFileList);
resultData.put("filePathSize", filePathSize);
return returnMap;
}
public Map DownloadAndUploadFileNoUa(List<String> fileList, Map<String, Object> resultData) {
List<String> filePath = new ArrayList<>();
List<Map> delFileList = new ArrayList<>();
Map<String, Object> returnMap = new HashMap<>(32);
List<Map> forwardUrl = new ArrayList<>();
List<Map> filePathSize = new ArrayList<>();
Integer downloadStatusCode = 0;
for (String url : fileList) {
Map fileInfoMap = new HashMap(32);
Map delFileMap = new HashMap(32);
Map fileMap = new HashMap(32);
if (formatJudgment(url)) {
downloadStatusCode = Constants.URL_FORMAT;
continue;
}
String[] split = url.split("###");
String fileUrl = split[0];
String fileType = split[1];
try {
if (!StringUtil.hasValue(fileUrl)) {
downloadStatusCode = Constants.IMAGE_LIST_NULL;
continue;
}
downLoadFileExeCommandForWGetNoUa(fileUrl, fileType);
} catch (Exception e) {
downloadStatusCode = Constants.DOWNLOAD_FILE_FAIL;
log.error("File download failed url:{},Exception:",url,e);
continue;
}
String delFileUrl = "";
String uploadFileUrl = "";
try {
fileInfoMap.put("videoTime", "");
fileInfoMap.put("resolution", "");
String uploadResultJson = upLoadFile(downloadFilePath + getMd5(fileUrl) + fileType);
JSONObject jsonObject = JSON.parseObject(uploadResultJson);
String src = jsonObject.getString("src");
String md5 = jsonObject.getString("md5");
delFileUrl = gofastUrl.replace("/upload", "") + "/delete?md5=" + md5;
uploadFileUrl = jsonObject.getString("url").replaceAll("\\?.*", "");;
Long size = jsonObject.getLong("size");
String printSize = getPrintSize(size);
fileInfoMap.put("size", printSize);
fileInfoMap.put("url", src);
} catch (Exception e) {
downloadStatusCode = Constants.UPLOAD_FILE_FAIL;
log.error("File upload failed url:{},Exception:",url, e);
e.printStackTrace();
continue;
}
filePath.add(uploadFileUrl);
delFileMap.put(uploadFileUrl, delFileUrl);
delFileList.add(delFileMap);
fileMap.put("gofastUrl", uploadFileUrl);
fileMap.put("originalUrl", fileUrl);
forwardUrl.add(fileMap);
filePathSize.add(fileInfoMap);
}
returnMap.put("forwardUrl", forwardUrl);
returnMap.put("filePath", filePath);
returnMap.put("delFileList", delFileList);
returnMap.put("downloadStatusCode", downloadStatusCode);
resultData.put("ErroeMessage", downloadStatusCode);
returnMap.put("filePathSize", filePathSize);
resultData.put("forwardUrl", forwardUrl);
resultData.put("filePath", filePath);
resultData.put("delFileList", delFileList);
resultData.put("filePathSize", filePathSize);
return returnMap;
}
}

30
src/main/java/com/bw/fileDownload/service/FileDownloadService.java

@ -0,0 +1,30 @@
package com.bw.fileDownload.service;
import java.util.Map;
/**
* 各类型任务执行接口
* @author jian.mao
* @date 2025年2月7日
* @description
*/
public interface FileDownloadService {
/**
* 视频类型任务执行逻辑接口
* @param task
*/
public void video(Map<String, Object> task);
/**
* 文档or图片执行逻辑接口
* @param task
*/
public void docOrImg(Map<String, Object> task);
/**
* 无附件类型执行逻辑接口
* @param task
*/
public void notFile(Map<String, Object> task);
}

160
src/main/java/com/bw/fileDownload/service/impl/FileDownloadServiceImpl.java

@ -0,0 +1,160 @@
package com.bw.fileDownload.service.impl;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.bw.fileDownload.components.SpringBootKafka;
import com.bw.fileDownload.entity.Constants;
import com.bw.fileDownload.service.DownloadExecService;
import com.bw.fileDownload.service.FileDownloadService;
import lombok.extern.slf4j.Slf4j;
/**
* 附件下载逻辑实现类
* @author jian.mao
* @date 2025年2月6日
* @description
*/
@Service
@Slf4j
public class FileDownloadServiceImpl implements FileDownloadService {
@Autowired
private SpringBootKafka springBootKafka;
@Autowired
private DownloadExecService downloadExecService;
@Value("${kafka.producer.topic}")
private String topic;
@Override
public void video(Map<String, Object> task) {
//视频队列的任务不仅仅做视频下载还需要判断是否进行图片和文档的下载
Map<String, Object> parseData = (Map<String, Object>) task.get(Constants.PARSEDATA);
Map<String, Object> data = (Map<String, Object>) parseData.get(Constants.DATA);
String cid = (String) data.get(Constants.CID);
String url = (String) data.get(Constants.URL);
//获取视频链接
List<String> videoList = (List<String>) data.get(Constants.VIDEOLIST);
//视频下载
Map<String, Object> resultMap = downloadExecService.downloadAndUploadVidoAndAudio(videoList, data);
int downloadStatusCode = (int) resultMap.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("视频下载成功: url={},cid={}",url,cid);
} else {
log.info("视频下载失败: url={},cid={}",url,cid);
}
//判断文档和图片包含进行下载操作
List<String> fileList = (List<String>) data.get(Constants.FILELIST);
if(fileList != null && fileList.size() > 0) {
//有文档
Map docRes = downloadExecService.DownloadAndUploadFile(fileList, data);
downloadStatusCode = (int) docRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("文档下载成功: url={},cid={}",url,cid);
} else {
log.warn("文档下载失败,尝试无ua请求下载----");
docRes = downloadExecService.DownloadAndUploadFileNoUa(fileList, data);
downloadStatusCode = (int) docRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("文档下载成功: url={},cid={}",url,cid);
} else {
log.error("文档下载失败: url={},cid={}",url,cid);
}
}
}
List<String> imgList = (List<String>) data.get(Constants.IMGLIST);
if(imgList != null && imgList.size() > 0) {
//有图片
Map imgRes = downloadExecService.DownloadAndUploadImg(imgList, data);
downloadStatusCode = (int) imgRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("download task success: url=" + url + ",cid=" + cid);
} else {
log.warn("图片下载失败,切换无ua方式下载---");
imgRes = downloadExecService.DownloadAndUploadImgNoUa(imgList, data);
downloadStatusCode = (int) imgRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("图片下载成功: url={},cid={}",url,cid);
} else {
log.error("图片下载失败: url={},cid={}",url,cid);
}
}
}
//发送kafka完成数据流闭环
try {
springBootKafka.send(topic, JSONObject.toJSONString(task));
} catch (Exception e) {
log.error("视频类型任务发送失败。task:{},异常:{}",JSONObject.toJSON(task),e);
}
}
@Override
public void docOrImg(Map<String, Object> task) {
//文档or图潘队列的任务
Map<String, Object> parseData = (Map<String, Object>) task.get(Constants.PARSEDATA);
Map<String, Object> data = (Map<String, Object>) parseData.get(Constants.DATA);
String cid = (String) data.get(Constants.CID);
String url = (String) data.get(Constants.URL);
//判断文档和图片包含进行下载操作
List<String> fileList = (List<String>) data.get(Constants.FILELIST);
if(fileList != null && fileList.size() > 0) {
//有文档
Map docRes = downloadExecService.DownloadAndUploadFile(fileList, data);
int downloadStatusCode = (int) docRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("文档下载成功: url={},cid={}",url,cid);
} else {
log.warn("文档下载失败,尝试无ua请求下载----");
docRes = downloadExecService.DownloadAndUploadFileNoUa(fileList, data);
downloadStatusCode = (int) docRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("文档下载成功: url={},cid={}",url,cid);
} else {
log.error("文档下载失败: url={},cid={}",url,cid);
}
}
}
List<String> imgList = (List<String>) data.get(Constants.IMGLIST);
if(imgList != null && imgList.size() > 0) {
//有图片
Map imgRes = downloadExecService.DownloadAndUploadImg(imgList, data);
int downloadStatusCode = (int) imgRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("download task success: url=" + url + ",cid=" + cid);
} else {
log.warn("图片下载失败,切换无ua方式下载---");
imgRes = downloadExecService.DownloadAndUploadImgNoUa(imgList, data);
downloadStatusCode = (int) imgRes.get("downloadStatusCode");
if (downloadStatusCode == Constants.DOWNLOAD_SUCCESS) {
log.info("图片下载成功: url={},cid={}",url,cid);
} else {
log.error("图片下载失败: url={},cid={}",url,cid);
}
}
}
//发送kafka完成数据流闭环
try {
springBootKafka.send(topic, JSONObject.toJSONString(task));
} catch (Exception e) {
log.error("文档or图片类型任务发送失败。task:{},异常:{}",JSONObject.toJSON(task),e);
}
}
@Override
public void notFile(Map<String, Object> task) {
try {
//无附件不进行处理直接发送kafka
springBootKafka.send(topic, JSONObject.toJSONString(task));
} catch (Exception e) {
log.error("无附件类型任务发送失败。task:{},异常:{}",JSONObject.toJSON(task),e);
}
}
}

30
src/main/java/com/bw/fileDownload/utils/FileUtil.java

@ -0,0 +1,30 @@
package com.bw.fileDownload.utils;
import java.io.FileWriter;
import java.io.IOException;
/**
* 文件工具类
* @author jian.mao
* @date 2023年7月14日
* @description
*/
public class FileUtil {
/**
* 数据写入文件
* @param Path 文件路径
* @param result 数据
* @throws IOException
*/
public static void writeFile(String path,String result){
try {
FileWriter fw = new FileWriter(path,true);
fw.write(result+"\n");
fw.flush();
fw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

122
src/main/java/com/bw/fileDownload/utils/StringUtil.java

@ -0,0 +1,122 @@
package com.bw.fileDownload.utils;
import java.security.MessageDigest;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
/**
* @author:jinming
* @className:StringUtil
* @version:1.0
* @description:
* @Date:2023/3/6 14:33
*/
@Slf4j
public class StringUtil {
public static boolean hasValue(String str) {
return str != null && !"".equals(str.trim());
}
public static String getRegexGroup(String regex, String str, int id) {
String resultStr = "";
if (hasValue(str)) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(str);
if (m.find()) {
resultStr = m.group(id);
}
}
if ("".equals(resultStr)) {
}
return resultStr;
}
public static String getMd5(String string) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] bs = md5.digest(string.getBytes("UTF-8"));
StringBuilder sb = new StringBuilder(40);
for (byte x : bs) {
if ((x & 0xff) >> 4 == 0) {
sb.append("0").append(Integer.toHexString(x & 0xff));
} else {
sb.append(Integer.toHexString(x & 0xff));
}
}
return sb.toString();
} catch (Exception e) {
//LOG.error("获取md5异常", e);
return "nceaform" + System.currentTimeMillis();
}
}
public static String removeAllHtmlTags(String str) {
return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : "";
}
public static String getRegexGroup(Pattern regex, String str, int id) {
String resultStr = "";
if (hasValue(str)) {
Matcher m = regex.matcher(str);
if (m.find()) {
resultStr = m.group(id);
}
}
if ("".equals(resultStr)) {
log.error(regex + " parser error!");
}
return resultStr;
}
public static String getStrByPattern(String str, String regex) {
Pattern pattern = Pattern.compile(regex);
Matcher m = pattern.matcher(str);
return m.find() ? m.group(0) : "";
}
public static Set<String> getEmailAddress(String message) {
Set<String> emailList = new HashSet<>();
Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+");
Matcher m = pattern.matcher(message);
while (m.find()) {
emailList.add(m.group(0));
}
return emailList;
}
public static boolean checkDoubleHttp(String input) {
int count = 0;
int index = 0;
while (index < input.length()) {
index = input.indexOf("http://", index);
if (index == -1) {
break;
}
count++;
index += 4;
}
if (count >= 2) {
return true;
}
count = 0;
index = 0;
while (index < input.length()) {
index = input.indexOf("https://", index);
if (index == -1) {
break;
}
count++;
index += 4;
}
return count >= 2;
}
}

103
src/main/resources/application.yml

@ -0,0 +1,103 @@
logging:
level:
root: info
path: ./logs
server:
port: 8019
servlet:
context-path: /fileQueueSync
tomcat:
uri-encoding: utf-8
max-connections: 20000
max-http-form-post-size: 1
max-threads: 1000
spring:
application:
name: 附件补充下载服务
kafka:
bootstrap-servers: node-01:19092,node-02:19092,node-03:19092
producer:
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: true
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#消费组
group-id: test4
#消费者并发线程数
concurrency: 4
#超时时间
max-poll-interval-ms: 60000
listener:
# 在侦听器容器中运行的线程数。
#concurrency: 5
#listner负责ack,每调用一次,就立即commit
#ack-mode: manual_immediate
missing-topics-fatal: false
boot:
admin:
client:
url: http://192.168.0.44:8001
instance:
service-base-url: http://192.168.0.44:8019
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
health:
elasticsearch:
enabled: false
kafka:
consumer:
topic: kafka08_news_parse_queue
producer:
topic: kafka08_news_fileSync_queue
#任务队列相关配置
task:
dispatch-thread-num: 1
results-thread-num: 1
save-thread-num: 1
queue-size: 10000
video-task-queue-path: ../data/videoTaskQueue.txt
docOrImg-task-queue-path: ../data/docOrImgTaskQueue.txt
notFile-task-queue-path: ../data/notFileTaskQueue.txt
#线程池配置
threadPool:
corePoolSize: 5
maximumPoolSize: 20
keepAliveTime: 60
queueSize: 100
file:
download:
path: ../file
upload:
url: http://10.8.0.10:8081/group1/upload

36
src/main/resources/logback-spring.xml

@ -0,0 +1,36 @@
<configuration>
<!-- 属性文件:在properties文件中找到对应的配置项 -->
<springProperty scope="context" name="logging.path" source="logging.path"/>
<springProperty scope="context" name="logging.level" source="logging.level.com.bfd"/>
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 -->
<!-- <appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</Pattern>
</encoder>
</appender> -->
<appender name="GLMAPPER-LOGGERONE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${logging.level}</level>
</filter>
<file>
${logging.path}/fileQueueSyncInfo.log
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logging.path}/fileQueueSyncInfo.log.%d{yyyy-MM-dd}</FileNamePattern>
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="GLMAPPER-LOGGERONE"/>
<!-- <appender-ref ref="STDOUT"/> -->
</root>
</configuration>

38
src/test/java/com/bw/AppTest.java

@ -0,0 +1,38 @@
package com.bw;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}
/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}

2
target/.gitignore

@ -0,0 +1,2 @@
/classes/
/test-classes/
Loading…
Cancel
Save