springboot kafka发送消息支持成功失败通知
Posted 别闹我真是09
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot kafka发送消息支持成功失败通知相关的知识,希望对你有一定的参考价值。
springboot集成kafka是比较简单的是事情,但是kafka发送消息的失败回调在日常工作中,如果不容忍消息丢失的话,发送失败需要再次发送或者放到数据库中用任务重推。
以下是演示用的发送类代码
@Slf4j @Component public class TestRunner implements ApplicationRunner { @Autowired KafkaTemplate kafkaTemplate; @Override public void run(ApplicationArguments args) throws Exception { KafkaMsgEntity kafkaMsgEntity = new KafkaMsgEntity(); kafkaMsgEntity.setActionName("login"); String tmpStr = "id:%d,msg:login"; for (int i = 1; i < 500; i++) { String tmpStr1 = tmpStr.replace("%d", String.valueOf(i)); Thread.sleep(500); kafkaMsgEntity.setMsgBody(tmpStr1); kafkaTemplate.send("test", JSON.toJSONString(kafkaMsgEntity)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { if (throwable instanceof KafkaProducerException) { String value = (String) ((KafkaProducerException) throwable).getProducerRecord().value(); log.info("{} get throwable msg:{}", value, throwable.getMessage()); } else { log.info("get throwable msg:{}", throwable.getMessage()); } } @Override public void onSuccess(SendResult<String, String> o) { log.info("{}, success", o.getProducerRecord().value()); } }); } } }
在kafka运行过程中kill进程达到异常发送的条件。
以上是关于springboot kafka发送消息支持成功失败通知的主要内容,如果未能解决你的问题,请参考以下文章
springboot集成kafka详细步骤(发送及监听消息示例)
kafka通过控制台模拟消息发送和消息接收正常,但是通过javaAPI操作生产者发送消息不成功 消费者接收不到数据解决方案?