From f5ca987ed9204ece7067eb89bbd05ecbd0fcca52 Mon Sep 17 00:00:00 2001 From: maojian <550076202@qq.com> Date: Wed, 30 Apr 2025 15:30:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Edeepseek=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E6=8E=A5=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/bw/qanda/cache/ConfigCache.java | 3 +- .../com/bw/qanda/controller/QandaController.java | 11 ++ .../java/com/bw/qanda/handler/MainHandler.java | 38 ++++++ .../com/bw/qanda/service/QandATaskService.java | 5 + .../java/com/bw/qanda/service/QandaService.java | 6 + .../qanda/service/impl/QandATaskServiceImpl.java | 139 +++++++++++++++++++++ .../bw/qanda/service/impl/QandaServiceImpl.java | 33 ++++- src/main/java/com/bw/qanda/utils/DownLoadUtil.java | 2 +- .../com/bw/qanda/utils/GPTResultParseUtil.java | 10 +- src/main/resources/application.yml | 4 + 10 files changed, 245 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/bw/qanda/cache/ConfigCache.java b/src/main/java/com/bw/qanda/cache/ConfigCache.java index de69f0c..37dca34 100644 --- a/src/main/java/com/bw/qanda/cache/ConfigCache.java +++ b/src/main/java/com/bw/qanda/cache/ConfigCache.java @@ -17,8 +17,7 @@ public class ConfigCache { public static boolean isStart = true; /*****任务队列*****/ public static LinkedBlockingDeque> taskQueue = new LinkedBlockingDeque>(); - public static LinkedBlockingDeque> videoCommitTaskQueue = new LinkedBlockingDeque>(); - public static LinkedBlockingDeque> videoResultTaskQueue = new LinkedBlockingDeque>(); + public static LinkedBlockingDeque> localTaskQueue = new LinkedBlockingDeque>(); /** diff --git a/src/main/java/com/bw/qanda/controller/QandaController.java b/src/main/java/com/bw/qanda/controller/QandaController.java index f0f6d89..202e37d 100644 --- a/src/main/java/com/bw/qanda/controller/QandaController.java +++ b/src/main/java/com/bw/qanda/controller/QandaController.java @@ -33,6 +33,17 @@ public class QandaController { return response; } + /** + * 本地模型调用 + * @param dataJson + * @return + */ + @PostMapping("/putLocalQuestion") + @ResponseBody + public String putLocalQuestion(@RequestBody String dataJson){ + String response = qandaService.putLocalQuestion(dataJson); + return response; + } @RequestMapping(value = "/hello", method = RequestMethod.GET) @ResponseBody public String hello(String param, String token) { diff --git a/src/main/java/com/bw/qanda/handler/MainHandler.java b/src/main/java/com/bw/qanda/handler/MainHandler.java index c9325bc..492cc69 100644 --- a/src/main/java/com/bw/qanda/handler/MainHandler.java +++ b/src/main/java/com/bw/qanda/handler/MainHandler.java @@ -43,6 +43,8 @@ public class MainHandler implements ApplicationRunner { @Value("${task.task-queue-path}") private String taskPath; + @Value("${task.local-task-queue-path}") + private String localTaskPath; @Resource private QandATaskService qandATaskService; @Resource @@ -95,8 +97,28 @@ public class MainHandler implements ApplicationRunner { }); textConsumerThread.start(); log.info("问答模型任务消费线程启动-----"); + //消费文本翻译任务队列数据 + Thread localTextConsumerThread = new Thread(() -> { + while (true) { + try { + log.info("本地模型任务队列长度:{}",ConfigCache.localTaskQueue.size()); + // 从队列中获取任务 + Map task = ConfigCache.localTaskQueue.take(); + // 提交给线程池执行 + executor.execute(() -> localQandAExec(task)); + } catch (InterruptedException e) { + // 恢复中断状态 + Thread.currentThread().interrupt(); + log.error("创建任务消费线程被中断"); + break; + } + } + }); + localTextConsumerThread.start(); + log.info("本地问答模型任务消费线程启动-----"); //加载任务 readTask(taskPath, ConfigCache.taskQueue); + readTask(localTaskPath, ConfigCache.localTaskQueue); //钩子拉起 waitDown(); } @@ -104,6 +126,9 @@ public class MainHandler implements ApplicationRunner { public void qandAExec(Map task) { qandATaskService.qandA(task); } + public void localQandAExec(Map task) { + qandATaskService.localQandA(task); + } @SuppressWarnings("unchecked") public static void readTask(String path, LinkedBlockingDeque> queue) { @@ -160,5 +185,18 @@ public class MainHandler implements ApplicationRunner { break; } } + while (true) { + if (ConfigCache.localTaskQueue.size() > 0) { + try { + Map task = ConfigCache.localTaskQueue.take(); + FileUtil.writeFile(localTaskPath, JSONObject.toJSONString(task)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + log.info("taskQueue write is file end"); + break; + } + } } } diff --git a/src/main/java/com/bw/qanda/service/QandATaskService.java b/src/main/java/com/bw/qanda/service/QandATaskService.java index ededa14..74f5e09 100644 --- a/src/main/java/com/bw/qanda/service/QandATaskService.java +++ b/src/main/java/com/bw/qanda/service/QandATaskService.java @@ -15,4 +15,9 @@ public interface QandATaskService { * @param task */ public void qandA(Map task); + /** + * 本地问答执行方法 + * @param task + */ + public void localQandA(Map task); } diff --git a/src/main/java/com/bw/qanda/service/QandaService.java b/src/main/java/com/bw/qanda/service/QandaService.java index 1e29987..e9e1faa 100644 --- a/src/main/java/com/bw/qanda/service/QandaService.java +++ b/src/main/java/com/bw/qanda/service/QandaService.java @@ -14,5 +14,11 @@ public interface QandaService { * @return */ public String putQuestion(String dataJson); + /** + * 本地问答 + * @param dataJson + * @return + */ + public String putLocalQuestion(String dataJson); } diff --git a/src/main/java/com/bw/qanda/service/impl/QandATaskServiceImpl.java b/src/main/java/com/bw/qanda/service/impl/QandATaskServiceImpl.java index 51760f8..9ebf80f 100644 --- a/src/main/java/com/bw/qanda/service/impl/QandATaskServiceImpl.java +++ b/src/main/java/com/bw/qanda/service/impl/QandATaskServiceImpl.java @@ -35,6 +35,10 @@ public class QandATaskServiceImpl implements QandATaskService { private SpringBootKafka springBootKafka; @Value("${customize-kafka.producer.topic}") private String topic; + @Value("${localModel.apiKey}") + private String localQaKey; + @Value("${localModel.url}") + private String localModelUrl; @Override public void qandA(Map task) { try { @@ -144,6 +148,110 @@ public class QandATaskServiceImpl implements QandATaskService { } } + + @Override + public void localQandA(Map task) { + try { + log.info("本地任务:{}",JSONObject.toJSONString(task)); + //结果接收 + Map result = new HashMap(16); + //结果内容 + StringBuffer chatContent = new StringBuffer(); + //输入配置 + Map input = (Map) task.get(Constants.INPUT); + //输出配置 + Map output = (Map) task.get(Constants.OUTPUT); + //数据源 + Map data = (Map) task.get(Constants.DATA); + int scenesId = (int) task.get(Constants.SCENES_ID); + int version = (int) task.get(Constants.VERSION); + String pauseKey = scenesId + "_" + version; + if (!PauseTool.CACHE.containsKey(pauseKey)) { + log.info("流程:{}的版本:{}已失效,任务跳过", scenesId, version); + return; + } + //fieldType:自定义输出字段: 0 关闭,1-开启,如果开启则拼接form到output里(如果关闭,则取默认的output拼接) + int fieldType = (int) input.get(Constants.FIELDTYPE); + Float temperature = Float.valueOf(input.get(Constants.TEMPERATURE).toString()); + Float topP = Float.valueOf(input.get(Constants.TOP_P).toString()); + List> prompt = (List>) input.get(Constants.PROMPT); + String answerStr = localQandaRequest(temperature, topP, prompt, data); + log.info("answerStr:" + answerStr); + Map answer = JSONObject.parseObject(answerStr); + try { + //请求成功,正常解析 + Map message = (Map) answer.get(Constants.MESSAGE); + chatContent.append(message.get(Constants.CONTENT)); + } catch (Exception e) { + log.error("问答接口响应体异常:{}", answerStr, e); + // TODO: handle exception + //结果集 + Map results = new HashMap(16); + //遍历入库返回结果,拼接响应内容 + results.put("isLast", 1); + results.put("content", answerStr); + result.put(Constants.RESULTS, JSONObject.toJSONString(results)); + result.put(Constants.MESSAGE, "问答异常"); + result.put(Constants.STATUS, 2); + task.put(Constants.RESULT, result); + //发送kafka + springBootKafka.send(topic, JSONObject.toJSONString(task)); + log.info("数据流转至下游-------"); + return; + } + Map results = new HashMap(16); + results.put(Constants.ID, UUID.randomUUID().toString()); + results.put(Constants.CONTENT, chatContent.toString()); + if (fieldType != 0) { + results.remove(Constants.CONTENT); + try { + //请求成功,正常解析 + Map message = (Map) answer.get(Constants.MESSAGE); + String reponseContent = (String) message.get(Constants.CONTENT); + Map stringObjectMap = GPTResultParseUtil.parseGPTResult(output, reponseContent); + results.putAll(stringObjectMap); + } catch (Exception e) { + log.error("问答接口响应体异常:{}", answerStr, e); + // TODO: handle exception + //结果集 + //遍历入库返回结果,拼接响应内容 + results.put("isLast", 1); + results.put("content", answerStr); + result.put(Constants.RESULTS, JSONObject.toJSONString(results)); + result.put(Constants.MESSAGE, "问答异常"); + result.put(Constants.STATUS, 2); + task.put(Constants.RESULT, result); + //发送kafka + springBootKafka.send(topic, JSONObject.toJSONString(task)); + log.info("数据流转至下游-------"); + return; + } + } + results.put("isLast", 1); + result.put(Constants.RESULTS, JSONObject.toJSONString(results)); + result.put(Constants.MESSAGE, "成功"); + result.put(Constants.STATUS, 1); + task.put(Constants.RESULT, result); + //发送kafka + springBootKafka.send(topic, JSONObject.toJSONString(task)); + log.info("数据流转至下游-------"); + } catch (Throwable e) { + log.error("问答处理异常,", e); + //结果集 + Map results = new HashMap(16); + Map result = new HashMap(16); + //遍历入库返回结果,拼接响应内容 + results.put("isLast", 1); + results.put("content", e.getMessage()); + result.put(Constants.RESULTS, JSONObject.toJSONString(results)); + result.put(Constants.MESSAGE, "异常"); + result.put(Constants.STATUS, 2); + task.put(Constants.RESULT, result); + //发送kafka + springBootKafka.send(topic, JSONObject.toJSONString(task)); + log.info("数据流转至下游-------"); + } + } /** * 问题请求 * @@ -178,6 +286,37 @@ public class QandATaskServiceImpl implements QandATaskService { String html = DownLoadUtil.doPost(Constants.DEEPSEEK_CHAT_URL, JSONObject.toJSONString(params), headers); return html; } + /** + * 本地模型请求 + * @param temperature + * @param topP + * @param prompt + * @param data + * @return + */ + private String localQandaRequest(Float temperature, Float topP, List> prompt, Map data) { + //新建聊天话术 + StringBuffer chatContent = new StringBuffer(); + for (Map map : prompt) { + if (Integer.valueOf(map.get(Constants.TYPE).toString()) == Constants.CHAT_TYPE_ONE) { + chatContent.append(map.get(Constants.VALUE)); + } else { + String jsonPath = (String) map.get(Constants.VALUE); + chatContent.append(DataUtil.getValue(jsonPath, data).toString()); + } + } + Map headers = new HashMap(16); + Map params = new HashMap(16); + List> prompts = buildParam(chatContent.toString(), data); + params.put(Constants.MESSAGES, prompts); + params.put(Constants.TEMPERATURE, temperature); + params.put(Constants.TOP_P, topP); + params.put(Constants.MAX_TOKENS, 512); + headers.put("Content-Type", "application/json"); + headers.put("X-API-Key", localQaKey); + String html = DownLoadUtil.doPost(localModelUrl, JSONObject.toJSONString(params), headers); + return html; + } /** * 参数构建 diff --git a/src/main/java/com/bw/qanda/service/impl/QandaServiceImpl.java b/src/main/java/com/bw/qanda/service/impl/QandaServiceImpl.java index 3a01c5c..dfbe636 100644 --- a/src/main/java/com/bw/qanda/service/impl/QandaServiceImpl.java +++ b/src/main/java/com/bw/qanda/service/impl/QandaServiceImpl.java @@ -53,5 +53,36 @@ public class QandaServiceImpl implements QandaService { return JSONObject.toJSONString(response); } - + /** + *本地问答队列搭建 + */ + @Override + public String putLocalQuestion(String dataJson) { + Map response = new HashMap<>(16); + int code = 200; + String message = "success"; + Map task = null; + try { + task = JSONObject.parseObject(dataJson); + } catch (Exception e) { + log.error("参数结构不合法,", e); + code = 100010; + message = "参数不合法"; + } + // 写入队列 + try { + if(task.containsKey(Constants.TRACE) && (boolean)task.get(Constants.TRACE)){ + ConfigCache.localTaskQueue.putFirst(task); + }else{ + ConfigCache.localTaskQueue.put(task); + } + } catch (InterruptedException e) { + log.error("任务写入等待队列异常,", e); + code = 100011; + message = "任务写入等待队列失败"; + } + response.put(Constants.CODE, code); + response.put(Constants.MESSAGE, message); + return JSONObject.toJSONString(response); + } } diff --git a/src/main/java/com/bw/qanda/utils/DownLoadUtil.java b/src/main/java/com/bw/qanda/utils/DownLoadUtil.java index 16fabb7..b9a7aad 100644 --- a/src/main/java/com/bw/qanda/utils/DownLoadUtil.java +++ b/src/main/java/com/bw/qanda/utils/DownLoadUtil.java @@ -295,7 +295,7 @@ public class DownLoadUtil { public static String doPost(String url, String params, Map... headers){ String strResult = ""; //设置超时时间 - int timeout = 60; + int timeout = 120; RequestConfig config = RequestConfig.custom(). setConnectTimeout(timeout * 1000). setConnectionRequestTimeout(timeout * 1000). diff --git a/src/main/java/com/bw/qanda/utils/GPTResultParseUtil.java b/src/main/java/com/bw/qanda/utils/GPTResultParseUtil.java index 46d2275..b72be0f 100644 --- a/src/main/java/com/bw/qanda/utils/GPTResultParseUtil.java +++ b/src/main/java/com/bw/qanda/utils/GPTResultParseUtil.java @@ -1,6 +1,9 @@ package com.bw.qanda.utils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; + +import lombok.extern.slf4j.Slf4j; + import com.alibaba.fastjson.JSONException; import java.util.HashMap; import java.util.Map; @@ -14,6 +17,7 @@ import java.util.regex.Pattern; * @description: * @Date:2024/6/28 10:11 */ +@Slf4j public class GPTResultParseUtil { public static Map parseGPTResult(Map output, String gptContent) { Map jsonResult = new HashMap<>(); @@ -34,7 +38,9 @@ public class GPTResultParseUtil { Pattern pattern = Pattern.compile("\\{.*\\}", Pattern.DOTALL); Matcher matcher = pattern.matcher(gptContent.replace("\n", "")); if (matcher.find()) { - JSONObject jsonGPT = JSON.parseObject(matcher.group()); + String reslut = matcher.group(); + log.info("匹配json的结果:{}",reslut); + JSONObject jsonGPT = JSON.parseObject(reslut); for (String key : output.keySet()) { if (jsonGPT.containsKey(key)) { jsonResult.put(key, jsonGPT.get(key)); @@ -45,7 +51,7 @@ public class GPTResultParseUtil { return null; } } catch (Exception ex) { - ex.printStackTrace(); + log.error("匹配json结果失败:",ex); return null; } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3fb6a20..af87829 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -92,6 +92,10 @@ customize-kafka: topic: produce_analyze task: task-queue-path: ../data/taskQueue.txt + local-task-queue-path: ../data/localTaskQueue.txt +localModel: + apiKey: deepseek-7b-id + url: http://192.168.2.112:8000/v1/chat threadPool: corePoolSize: 2 maximumPoolSize: 5