Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务
Posted roykingw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务相关的知识,希望对你有一定的参考价值。
通过Kafka的幂等性特性,可以保证单条消息的数据安全性。而在此基础上,Kafka还为批量消息的数据安全性,设计提供了消息事务功能。**Kafka的生产者消息事务,是保证同一批次的多条消息,可以同时保证同时写入成功或者同时写入失败。**注意:要使用生产者消息事务,需要先保证幂等性功能是开启的。
这涉及到KafkaProducer的几个API:
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
对于单个Producer,可以通过生产者事务机制保证这一批消息刚好发送一次。
public class TransactionProducer
private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
private static final String TOPIC = "disTopic";
public static void main(String[] args)
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置事务 id(必须),事务 id 任意起名,建议包含一定的业务唯一性
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order_1");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>(TOPIC, "Message " + i));
// 提交事务
kafkaProducer.commitTransaction();
catch (Exception e)
// 终止事务
kafkaProducer.abortTransaction();
finally
kafkaProducer.close();
对于幂等性这个话题,再延伸到消费者端。其实Broker与消费者端是有消息重试机制的,所以要保证消费者端的消息幂等性,是比较困难的。通常,主要还是在消费者端的业务逻辑中自行保证业务幂等性。比如,对于转账交易消息,可以通过交易ID先去后端判断一下消息有没有处理过。如果已经处理过,就放弃这条消息。
以上是关于Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务的主要内容,如果未能解决你的问题,请参考以下文章
Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务
Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性
Kafka3.x核心速查手册二客户端使用篇-6消息发送幂等性