You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
package com.bw.convert.handler;
import java.io.File;import java.io.IOException;import java.util.List;import java.util.Map;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.core.annotation.Order;import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;import com.bw.convert.cache.ConfigCache;import com.bw.convert.service.ConvertTaskService;import com.bw.convert.utils.DateUtil;import com.bw.convert.utils.FileUtil;
import lombok.extern.slf4j.Slf4j;
/** * @author jian.mao * @date 2025年1月13日 * @description */@Component@Order(value = 1)@Slf4jpublic class MainHandler implements ApplicationRunner {
@Value("${task.task-queue-path}") private String taskPath; @Value("${task.result-task-queue-path}") private String resultTaskPath; @Autowired private ConvertTaskService convertTaskService; /***线程池参数***/ @Value("${threadPool.corePoolSize}") private int corePoolSize; @Value("${threadPool.maximumPoolSize}") private int maximumPoolSize; @Value("${threadPool.keepAliveTime}") private long keepAliveTime; @Value("${threadPool.queueSize}") private int queueSize; /** *执行入口 */ @Override public void run(ApplicationArguments args) throws Exception { //线程池方式
ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize), new ThreadPoolExecutor.CallerRunsPolicy() ); //消费创建任务队列数据
Thread consumerThread = new Thread(() -> { while (true) { try { // 从队列中获取任务
Map<String, Object> task = ConfigCache.taskQueue.take(); log.info("创建任务----:{}",JSONObject.toJSONString(task)); // 提交给线程池执行
executor.execute(() -> createTask(task)); } catch (InterruptedException e) { // 恢复中断状态
Thread.currentThread().interrupt(); log.error("任务消费线程被中断"); break; } } }); consumerThread.start(); log.info("创建任务消费线程启动-----"); //消费结果任务队列数据
Thread resultConsumerThread = new Thread(() -> { while (true) { try { // 从队列中获取任务
Map<String, Object> task = ConfigCache.resultQueue.take(); log.info("获取结果任务----:{}",JSONObject.toJSONString(task)); // 提交给线程池执行
executor.execute(() -> getResult(task)); } catch (InterruptedException e) { // 恢复中断状态
Thread.currentThread().interrupt(); log.error("任务消费线程被中断"); break; } DateUtil.sleep(3000); } }); resultConsumerThread.start(); log.info("结果任务消费线程启动-----"); //启动加载缓存任务
readTask(taskPath, ConfigCache.taskQueue); readTask(resultTaskPath, ConfigCache.resultQueue); //停止处理
waitDown(); }
/** * 创建任务执行方法 * @param task */ private void createTask(Map<String, Object> task) { convertTaskService.create(task); } private void getResult(Map<String, Object> task) { convertTaskService.parse(task); } /****************************************************************load******************************************************************************/ /** * 加载文件中的任务 * @param path 文件地址 * @param queue 队列 */ @SuppressWarnings("unchecked") public static void readTask(String path, LinkedBlockingDeque<Map<String, Object>> queue) { File file = new File(path); if (file.exists()) { List<String> tasks = null; try { tasks = FileUtils.readLines(file, "UTF-8"); } catch (IOException e) { e.printStackTrace(); } for (String taskStr : tasks) { Map<String, Object> task = JSONObject.parseObject(taskStr); try { queue.put(task); } catch (InterruptedException e) { e.printStackTrace(); } } file.delete(); } }
/*******************************************************************stop************************************************************************/ /** * 结束触发钩子 */ public void waitDown() { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // 停止线程
ConfigCache.isStart = false; log.info("stop-------"); writeTsskToFile(); } }); }
/** * 任务持久化到硬盘 */ public void writeTsskToFile() { while (true) { if (ConfigCache.taskQueue.size() > 0) { try { Map<String, Object> task = ConfigCache.taskQueue.take(); FileUtil.writeFile(taskPath, JSONObject.toJSONString(task)); } catch (InterruptedException e) { e.printStackTrace(); } } else { log.info("taskQueue write is file end"); break; } } while (true) { if (ConfigCache.resultQueue.size() > 0) { try { Map<String, Object> task = ConfigCache.resultQueue.take(); FileUtil.writeFile(resultTaskPath, JSONObject.toJSONString(task)); } catch (InterruptedException e) { e.printStackTrace(); } } else { log.info("taskQueue write is file end"); break; } } }}
|