package com.bfd.parse.utils; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * @PROJECT_NAME: companybusinesscrawl * @DESCRIPTION:SpringBootKafka 工具类 * @AUTHOR: ying.zhao * @DATE: 2023/4/6 11:09 */ @Slf4j @Component public class SpringBootKafka { @Autowired private KafkaTemplate kafkaTemplate; /** * 自定义topicKafkaTemplate */ /** * public static final String TOPIC = "companyBussTest"; **/ public void send(String topic, String message) { //发送消息 ListenableFuture> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable throwable) { //发送失败的处理 log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult stringObjectSendResult) { //成功的处理 log.info("{} - 生产者 发送消息成功:",topic); } }); } }