springboot kafka发送消息支持成功失败通知

Posted 别闹我真是09

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot kafka发送消息支持成功失败通知相关的知识,希望对你有一定的参考价值。

springboot集成kafka是比较简单的是事情,但是kafka发送消息的失败回调在日常工作中,如果不容忍消息丢失的话,发送失败需要再次发送或者放到数据库中用任务重推。
以下是演示用的发送类代码

@Slf4j
@Component
public class TestRunner implements ApplicationRunner {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        KafkaMsgEntity kafkaMsgEntity = new KafkaMsgEntity();
        kafkaMsgEntity.setActionName("login");
        String tmpStr = "id:%d,msg:login";
        for (int i = 1; i < 500; i++) {
            String tmpStr1 = tmpStr.replace("%d", String.valueOf(i));
            Thread.sleep(500);
            kafkaMsgEntity.setMsgBody(tmpStr1);
            kafkaTemplate.send("test", JSON.toJSONString(kafkaMsgEntity)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    if (throwable instanceof KafkaProducerException) {
                        String value = (String) ((KafkaProducerException) throwable).getProducerRecord().value();
                        log.info("{} get throwable msg:{}", value, throwable.getMessage());
                    } else {
                        log.info("get throwable msg:{}", throwable.getMessage());
                    }
                }

                @Override
                public void onSuccess(SendResult<String, String> o) {
                    log.info("{}, success", o.getProducerRecord().value());
                }
            });
        }
    }
}

在kafka运行过程中kill进程达到异常发送的条件。

 

 

以上是关于springboot kafka发送消息支持成功失败通知的主要内容,如果未能解决你的问题,请参考以下文章

springboot集成kafka详细步骤(发送及监听消息示例)

Java API中kafka生产者发送消息没有成功

春季启动生产者在kafka重启后无法发送任何消息

kafka通过控制台模拟消息发送和消息接收正常,但是通过javaAPI操作生产者发送消息不成功 消费者接收不到数据解决方案?

RabbitMQ和kafka对于消费失败处理总结

关于Kafka幂等producer的讨论