|
|
package com.bfd.task.process;
import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;import com.bfd.task.cache.ConfigCache;import com.bfd.task.entity.Constants;import com.bfd.task.utils.QueueUtil;
import java.time.Duration;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;
import lombok.extern.slf4j.Slf4j;
/** * @author jian.mao * @date 2023年9月21日 * @description */@Slf4j@Componentpublic class DataConsumptionProcess implements Runnable {
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${customize-kafka.consumer.topic}") private String topic; @SuppressWarnings("unchecked") @Override public void run() { // 创建 Kafka 消费者配置
Map<String, Object> consumerProps = new HashMap<String, Object>(16); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //跟读
consumerProps.put("auto.offset.reset", "latest"); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); consumerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps); try { // 订阅主题
consumer.subscribe(Collections.singletonList(topic)); // 消费消息
while (true) { // 没超时的话正常消费数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { Map<String, Object> resultData = new HashMap<String, Object>(32); try { // 处理消息,这里可以根据需要进行业务处理
Map<String, Object> resultEs = JSONObject.parseObject(record.value()); log.info("Received message: "+ record.value()); if(!resultEs.containsKey(Constants.TASKID)){ log.warn("数据体缺少taskId"); continue; } String taskId = resultEs.get(Constants.TASKID).toString(); if(!ConfigCache.taskCache.containsKey(taskId)){ log.warn("不属于有知任务产出的数据,taskId:{}",taskId); continue; } Map<String, Object> task = (Map<String, Object>) ConfigCache.taskCache.get(taskId); String token = (String) task.get(Constants.BUSINESSKEY); Map<String, Object> input = (Map<String, Object>) task.get(Constants.INPUT); Integer hasVideo = (Integer) input.get(Constants.HASVIDEO); if(resultEs.get(Constants.HASVIDEO).equals(hasVideo)){ Map<String,Object> crawlResults = JSONObject.parseObject(record.value()); //结果加工 例如videopath[]转换成String
bulidResult(crawlResults); // 结果集组装
Map<String, Object> result = new HashMap<String, Object>(16); //结果内容
Map<String,Object> data = new HashMap<String, Object>(16); //获取输出字段
Map<String,Object> output = (Map<String, Object>) task.get(Constants.OUTPUT); for (String key: output.keySet()) { if (crawlResults.containsKey(key)){ data.put(key,crawlResults.get(key)); } } result.put(Constants.RESULTS, JSONObject.toJSONString(data)); for (String key : task.keySet()) { resultData.put(key, task.get(key)); } result.put(Constants.STATUS, 1); result.put(Constants.MESSAGE, "成功"); resultData.put(Constants.RESULT, result); QueueUtil.sendQueue.put(JSONObject.toJSONString(resultData)); //taskId赋值
if(resultEs.containsKey(Constants.TASKID)){ if(taskId == null){ taskId = resultEs.get(Constants.TASKID).toString(); } } }else{ log.info("不符合需求数据----"); } } catch (Exception e) { // TODO: handle exception
log.error("数据格式异常:{}",record.value()); //结果集
Map<String, Object> result = new HashMap<String, Object>(16); //遍历入库返回结果,拼接响应内容
result.put(Constants.RESULTS, e.getMessage()); result.put(Constants.MESSAGE, "异常"); result.put(Constants.STATUS, 2); resultData.put(Constants.RESULT, result); //发送kafka
QueueUtil.sendQueue.put(JSONObject.toJSONString(resultData)); } } } } catch (Exception e) { log.error("kafka消费异常\n", e); consumer.close(); } } /** * 结果加工 * @param result */ private void bulidResult(Map<String, Object> result){ //视频gofast地址加工
List<String> videoPath = (List<String>) result.get(Constants.VIDEOPATH); if(videoPath != null && videoPath.size() > 0){ String videoUrl = videoPath.get(0); result.put(Constants.VIDEOPATH, videoUrl); } }}
|