opai服务管理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
5.1 KiB

  1. package com.bw.translate.handler;
  2. import java.io.File;
  3. import java.io.IOException;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.concurrent.LinkedBlockingDeque;
  7. import java.util.concurrent.LinkedBlockingQueue;
  8. import java.util.concurrent.ThreadPoolExecutor;
  9. import java.util.concurrent.TimeUnit;
  10. import org.apache.commons.io.FileUtils;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.boot.ApplicationArguments;
  14. import org.springframework.boot.ApplicationRunner;
  15. import org.springframework.cloud.context.config.annotation.RefreshScope;
  16. import org.springframework.core.annotation.Order;
  17. import org.springframework.stereotype.Component;
  18. import com.alibaba.fastjson.JSONObject;
  19. import com.bw.translate.cache.ConfigCache;
  20. import com.bw.translate.service.TranslateTaskService;
  21. import com.bw.translate.utils.FileUtil;
  22. import lombok.extern.slf4j.Slf4j;
  23. /**
  24. * @author jian.mao
  25. * @date 2025年1月13日
  26. * @description
  27. */
  28. @Component
  29. @Order(value = 1)
  30. @RefreshScope
  31. @Slf4j
  32. public class MainHandler implements ApplicationRunner {
  33. @Value("${task.task-queue-path}")
  34. private String taskPath;
  35. @Autowired
  36. private TranslateTaskService translateTaskService;
  37. /***线程池参数***/
  38. @Value("${threadPool.corePoolSize}")
  39. private int corePoolSize;
  40. @Value("${threadPool.maximumPoolSize}")
  41. private int maximumPoolSize;
  42. @Value("${threadPool.keepAliveTime}")
  43. private long keepAliveTime;
  44. @Value("${threadPool.queueSize}")
  45. private int queueSize;
  46. /**
  47. *执行入口
  48. */
  49. @Override
  50. public void run(ApplicationArguments args) throws Exception {
  51. //线程池方式
  52. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  53. corePoolSize,
  54. maximumPoolSize,
  55. keepAliveTime,
  56. TimeUnit.SECONDS,
  57. new LinkedBlockingQueue<>(queueSize),
  58. new ThreadPoolExecutor.CallerRunsPolicy()
  59. );
  60. //消费创建任务队列数据
  61. Thread consumerThread = new Thread(() -> {
  62. while (true) {
  63. try {
  64. // 从队列中获取任务
  65. Map<String, Object> task = ConfigCache.taskQueue.take();
  66. // 提交给线程池执行
  67. executor.execute(() -> translateTask(task));
  68. } catch (InterruptedException e) {
  69. // 恢复中断状态
  70. Thread.currentThread().interrupt();
  71. log.error("任务消费线程被中断");
  72. break;
  73. }
  74. }
  75. });
  76. consumerThread.start();
  77. log.info("任务消费线程启动-----");
  78. //启动加载缓存任务
  79. readTask(taskPath, ConfigCache.taskQueue);
  80. //停止处理
  81. waitDown();
  82. }
  83. /**
  84. * 创建任务执行方法
  85. * @param task
  86. */
  87. private void translateTask(Map<String, Object> task) {
  88. translateTaskService.translate(task);
  89. }
  90. /****************************************************************load******************************************************************************/
  91. /**
  92. * 加载文件中的任务
  93. * @param path 文件地址
  94. * @param queue 队列
  95. */
  96. @SuppressWarnings("unchecked")
  97. public static void readTask(String path, LinkedBlockingDeque<Map<String, Object>> queue) {
  98. File file = new File(path);
  99. if (file.exists()) {
  100. List<String> tasks = null;
  101. try {
  102. tasks = FileUtils.readLines(file, "UTF-8");
  103. } catch (IOException e) {
  104. e.printStackTrace();
  105. }
  106. for (String taskStr : tasks) {
  107. Map<String, Object> task = JSONObject.parseObject(taskStr);
  108. try {
  109. queue.put(task);
  110. } catch (InterruptedException e) {
  111. e.printStackTrace();
  112. }
  113. }
  114. file.delete();
  115. }
  116. }
  117. /*******************************************************************stop************************************************************************/
  118. /**
  119. * 结束触发钩子
  120. */
  121. public void waitDown() {
  122. Runtime.getRuntime().addShutdownHook(new Thread() {
  123. @Override
  124. public void run() {
  125. // 停止线程
  126. ConfigCache.isStart = false;
  127. log.info("stop-------");
  128. writeTsskToFile();
  129. }
  130. });
  131. }
  132. /**
  133. * 任务持久化到硬盘
  134. */
  135. public void writeTsskToFile() {
  136. while (true) {
  137. if (ConfigCache.taskQueue.size() > 0) {
  138. try {
  139. Map<String, Object> task = ConfigCache.taskQueue.take();
  140. FileUtil.writeFile(taskPath, JSONObject.toJSONString(task));
  141. } catch (InterruptedException e) {
  142. e.printStackTrace();
  143. }
  144. } else {
  145. log.info("taskQueue write is file end");
  146. break;
  147. }
  148. }
  149. }
  150. }