Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务

Posted roykingw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务相关的知识,希望对你有一定的参考价值。

​ 通过Kafka的幂等性特性,可以保证单条消息的数据安全性。而在此基础上,Kafka还为批量消息的数据安全性,设计提供了消息事务功能。**Kafka的生产者消息事务,是保证同一批次的多条消息,可以同时保证同时写入成功或者同时写入失败。**注意:要使用生产者消息事务,需要先保证幂等性功能是开启的。

​ 这涉及到KafkaProducer的几个API:

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

​ 对于单个Producer,可以通过生产者事务机制保证这一批消息刚好发送一次。

public class TransactionProducer 
    private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) 
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置事务 id(必须),事务 id 任意起名,建议包含一定的业务唯一性
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order_1");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 初始化事务
        kafkaProducer.initTransactions();
        // 开启事务
        kafkaProducer.beginTransaction();
        try 
            for (int i = 0; i < 5; i++) 
                kafkaProducer.send(new ProducerRecord<>(TOPIC, "Message " + i));
            
            // 提交事务
            kafkaProducer.commitTransaction();
         catch (Exception e) 
            // 终止事务
            kafkaProducer.abortTransaction();
         finally 
            kafkaProducer.close();
        
    

​ 对于幂等性这个话题,再延伸到消费者端。其实Broker与消费者端是有消息重试机制的,所以要保证消费者端的消息幂等性,是比较困难的。通常,主要还是在消费者端的业务逻辑中自行保证业务幂等性。比如,对于转账交易消息,可以通过交易ID先去后端判断一下消息有没有处理过。如果已经处理过,就放弃这条消息。

以上是关于Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务的主要内容,如果未能解决你的问题,请参考以下文章

Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务

Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性

Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性

Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性

Kafka3.x核心速查手册二客户端使用篇-5发送应答机制

Kafka3.x核心速查手册二客户端使用篇-5发送应答机制