Kafka 事务和幂等详解
Posted spark技术分享
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 事务和幂等详解相关的知识,希望对你有一定的参考价值。
1 生产者幂等性
1.1 引入
幂等性引入目的:
生产者重复生产消息。生产者进行retry会产生重试时,会重复产生消息。有了幂等性之后,在进行retry重试时,只会生成一个消息。
1.2 幂等性实现
1.2.1 PID 和 Sequence Number
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。
Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。
实现幂等之后
1.2.2 生成PID的流程
在执行创建事务时,如下
Producer<String, String> producer = new KafkaProducer<String, String>(props);
会创建一个Sender,并启动线程,执行如下run方法,在maybeWaitForProducerId()中生成一个producerId,如下:
1.3 幂等性的应用实例
1、配置属性
需要设置:
enable.idempotence,需要设置为ture,此时就会默认把acks设置为all,所以不需要再设置acks属性了。
2、发送消息
跟一般生成者一样,如下
此时,因为我们并没有配置transaction.id属性,所以不能使用事务相关API,如下
producer.initTransactions();
否则会出现如下错误:
2 事务属性
2.1 事务属性理解
事务属性是2017年Kafka 0.11.0.0引入的新特性。类似于数据库事务,只是这里的数据源是Kafka,kafka事务属性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。
注意:在理解消息的事务时,一直处于一个错误理解就是如下代码中,把操作db的业务逻辑跟操作消息当成是一个事务。其实这个是有问题的,操作DB数据库的数据源是DB,消息数据源是kfaka,这是完全不同两个数据,一种数据源(如mysql,kafka)对应一个事务,所以它们是两个独立的事务:kafka事务指kafka一系列 生产、消费消息等操作组成一个原子操作;db事务是指操作数据库的一系列增删改操作组成一个原子操作。
2.2 引入事务目的
在事务属性之前先引入了生产者幂等性,它的作用为:
生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败
consumer-transform-producer模式下,因为消费者提交偏移量出现问题,导致在重复消费消息时,生产者重复生产消息。需要将这个模式下消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。
消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交便宜量o2之前挂掉了(假设它最近提交的偏移量是o1),此时执行再均衡时,其它消费者会重复消费消息(o1到o2之间的消息)。
2.3 事务操作的API
producer提供了initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction 五个事务方法。
3 事务属性的应用实例
在一个原子操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引入的场景,最后一种情况没有使用价值。
只有Producer生产消息;
消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的“consume-transform-produce ”模式
只有consumer消费消息,这种操作其实没有什么意义,跟使用手动提交效果一样,而且也不是事务属性引入的目的,所以一般不会使用这种情况
3.1 相关属性配置
使用kafka的事务api时的一些注意事项:
需要消费者的自动模式设置为false,并且不能子再手动的进行执行consumer#commitSync或者consumer#commitAsyc
生产者配置transaction.id属性
生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
消费者需要配置Isolation.level。在consume-trnasform-produce模式下使用事务时,必须设置为READ_COMMITTED。
3.2 只有写
创建一个事务,在这个事务操作中,只有生成消息操作。代码如下:
创建生成者,代码如下,需要:
配置transactional.id属性
配置enable.idempotence属性
3.3 消费-生产并存(consume-transform-produce)
在一个事务中,既有生产消息操作又有消费消息操作,即常说的Consume-tansform-produce模式。如下实例代码
创建消费者代码,需要:
将配置中的自动提交属性(auto.commit)进行关闭
而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
设置isolation.level
3.4 只有读
创建一个事务,在这个事务操作中,只有生成消息操作,如下代码。这种操作其实没有什么意义,跟使用手动提交效果一样,无法保证消费消息操作和提交偏移量操作在一个事务。
4 生产者事务的实现
4.1 相关配置
4.1.1 Broker configs
4.1.2 Producer configs
4.1.3 Consumer configs
read_uncommitted,类似没有设置事务属性的consumer,即就是我们平常使用的consumer,只要消息写入到文件中就可以进行读取。
4.2 幂等性和事务性的关系
4.2.1 两者关系
事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
幂等性引入了Porducer ID
事务属性引入了Transaction Id属性。、
设置
enable.idempotence = true,transactional.id不设置:只支持幂等性。
enable.idempotence = true,transactional.id设置:支持事务属性和幂等性
enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性的kafka
enable.idempotence = false,transactional.id设置:无法获取到PID,此时会报错
4.2.2 tranaction id 、productid 和 epoch
一个app有一个tid,同一个应用的不同实例PID是一样的,只是epoch的值不同。如:
同一份代码运行两个实例,分步执行如下:在实例1没有进行提交事务前,开始执行实例2的初始化事务
step1 实例1-初始化事务。的打印出对应productId和epoch,信息如下:
[2018-04-21 20:56:23,106] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 123 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)
step2 实例1-发送消息。
step3 实例2-初始化事务。初始化事务时的打印出对应productId和epoch,信息如下:
18-04-21 20:56:48,373] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 124 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)
step4 实例1-提交事务,此时报错
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.
step5 实例2-提交事务
为了避免这种错误,同一个事务ID,只有保证如下顺序epch小producer执行init-transaction和committransaction,然后epoch较大的procuder才能开始执行init-transaction和commit-transaction,如下顺序:
有了transactionId后,Kafka可保证:
跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作【上面的实例可以验证】。kafka保证了关联同一个事务的所有producer(一个应用有多个实例)必须按照顺序初始化事务、和提交事务,否则就会有问题,这保证了同一事务ID中消息是有序的(不同实例得按顺序创建事务和提交事务)。
4.3 事务最佳实践-单实例的事务性
通过上面实例中可以看到kafka是跨Session的数据幂等发送,即如果应用部署多个实例时常会遇到上面的问题“org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.”,必须保证这些实例生成者的提交事务顺序和创建顺序保持一致才可以,否则就无法成功。其实,在实践中,我们更多的是如何实现对应用单实例的事务性。可以通过spring-kafaka实现思路来学习,即每次创建生成者都设置一个不同的transactionId的值,如下代码:
在spring-kafka中,对于一个线程创建一个producer,事务提交之后,还会关闭这个producer并清除,后续同一个线程或者新的线程重新执行事务时,此时就会重新创建producer。
创建消费者代码
4.4 Consume-transform-Produce 的流程
流程1 :查找Tranaction Corordinator。
流程2:初始化事务 initTransaction
Producer发送InitpidRequest给事务协调器,获取一个Pid。InitpidRequest的处理过程是同步阻塞的,一旦该调用正确返回,Producer就可以开始新的事务。TranactionalId通过InitpidRequest发送给Tranciton Corordinator,然后在Tranaciton Log中记录这<TranacionalId,pid>的映射关系。除了返回PID之外,还具有如下功能:
对PID对应的epoch进行递增,这样可以保证同一个app的不同实例对应的PID是一样的,但是epoch是不同的。
回滚之前的Producer未完成的事务(如果有)。
流程3: 开始事务beginTransaction
执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。
注意:这个操作并没有通知Transaction Coordinator。
流程4: Consume-transform-produce loop
流程4.0: 通过Consumtor消费消息,处理业务逻辑
流程4.1: producer向TransactionCordinantro发送AddPartitionsToTxnRequest
在producer执行send操作时,如果是第一次给<topic,partion>发送数据,此时会向Trasaction Corrdinator发送一个AddPartitionsToTxnRequest请求,Transaction Corrdinator会在transaction log中记录下tranasactionId和<topic,partion>一个映射关系,并将状态改为begin。AddPartionsToTxnRequest的数据结构如下:
流程4.2: producer#send发送 ProduceRequst
生产者发送数据,虽然没有还没有执行commit或者absrot,但是此时消息已经保存到kafka上,可以参考如下图断点位置处,此时已经可以查看到消息了,而且即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。
流程4.3: AddOffsetCommitsToTxnRequest
Producer通过KafkaProducer.sendOffsetsToTransaction 向事务协调器器发送一个AddOffesetCommitsToTxnRequests:
在执行事务提交时,可以根据ConsumerGroupID来推断_customer_offsets主题中相应的TopicPartions信息。这样在
流程4.4: TxnOffsetCommitRequest
Producer通过KafkaProducer.sendOffsetsToTransaction还会向消费者协调器Cosumer Corrdinator发送一个TxnOffsetCommitRequest,在主题_consumer_offsets中保存消费者的偏移量信息。
流程5: 事务提交和事务终结(放弃事务)
通过生产者的commitTransaction或abortTransaction方法来提交事务和终结事务,这两个操作都会发送一个EndTxnRequest给Transaction Coordinator。
流程5.1:EndTxnRequest。Producer发送一个EndTxnRequest给Transaction Coordinator,然后执行如下操作:
Transaction Coordinator会把PREPARE_COMMIT or PREPARE_ABORT 消息写入到transaction log中记录
执行流程5.2
执行流程5.3
流程5.2:WriteTxnMarkerRequest
对于Producer生产的消息。Tranaction Coordinator会发送WriteTxnMarkerRequest给当前事务涉及到每个<topic,partion>的leader,leader收到请求后,会写入一个COMMIT(PID) 或者 ABORT(PID)的控制信息到data log中
对于消费者偏移量信息,如果在这个事务里面包含_consumer-offsets主题。Tranaction Coordinator会发送WriteTxnMarkerRequest给Transaction Coordinartor,Transaction Coordinartor收到请求后,会写入一个COMMIT(PID) 或者 ABORT(PID)的控制信息到 data log中。
流程5.3:Transaction Coordinator会将最终的COMPLETE_COMMIT或COMPLETE_ABORT消息写入Transaction Log中以标明该事务结束。
只会保留这个事务对应的PID和timstamp。然后把当前事务其他相关消息删除掉,包括PID和tranactionId的映射关系。
4.4.1 文件类型和查看命令
kafka文件主要包括broker的data(主题:test)、事务协调器对应的transaction_log(主题:__tranaction_state)、偏移量信息(主题:_consumer_offsets)三种类型。如下图
这三种文件类型其实都是topic的分区,所以对于每一个目录都包含*.log、*.index、*.timeindex、*.txnindex文件(仅这个文件是为了实现事务属性引入的)。segment和segmengt对应index、timeindex、txnindex文件命名中序号表示的是第几个消息。如下图中,00000000000000368769.index和00000000000000568769.log中“368969”就是表示文件中存储的第一个消息是468969个消息。
对于索引文案包含两部分:
baseOffset:索引对应segment文件中的第几条message。
position:在segment中的绝对位置。
查看文件内容:
bin/kafka-run-class.sh kafka.tools.DumpLogSegments –files /Users/wuzhonghu/data/kafka-logs/firtstopic-0/00000000000000000002.log –print-data-log
4.4.2 ControlMessage和Transaction markers
Trasaction markers就是kafka为了实现事务定义的Controll Message。这个消息和数据消息都存放在log中,在Consumer读取事务消息时有用,可以参考下面章节-4.5.1 老版本-读取事务消息顺序。
4.4.3 Transaction Coordinator 和 Transaction Log
Transaction Log如下放置在“_tranaction_state”主题下面,默认是50个分区,每一个分区中文件格式和broker存储消息是一样的,都有log/index/timeindex文件,如下:
4.5 消费读取事务消息(READ_COMMITED)
Consumer为了实现事务,新增了一个isolation.level配置,有两个值如下,
READ_UNCOMMITTED,类似于没有事务属性的消费者。
READ_COMMITED,只获取执行了事务提交的消息。
在本小节中我们主要讲READ_COMMITED模式下读取消息的流程的两种版本的演化
4.5.1 老版本-读取事务消息顺序
如下图中,按顺序保存到broker中消息有:事务1消息T1-M1、对于事务2的消息有T2-M1、事务1消息T1-M2、非事务消息M1,最终到达client端的循序是M1-> T2-M1 -> T1-M1 -> T1-M2。
具体步骤如下:
step1 Consumer接受到事务消息T1-M1、T2-M2、T1-M2和非事务消息M1,因为没有收到事务T1和T2的控制消息,所以此时把事务相关消息T1-M1、T2-M2、T1-M2 保存到内存,然后只把非事务消息M1返回给client。
step2 Consumer接受到事务2的控制消息T2-C,此时就把事务消息T2-M1发送给Clinet。
step3 C onsumer接受到事务1的控制消息T1-C,此时就把事务消息T1-M1和T1-M2发送给Client
4.5.2 新版本-读取事务消息顺序
第一种方式,需要在consumer客户端缓存消息,当存在耗时比较长的事务时,占用客户端大量的内存资源。为了解决这个问题,通过LSO和Abort Index 文件来解决这个问题,参考:
https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc/edit
(1) LSO,Last stable offset。Broker在缓存中维护了所有处于运行状态的事务对应的initial offsets,LSO的值就是这些offsets中最小值-1。这样在LSO之前数据都是已经commit或者abort的数据,只有这些数据才对Consumer可见,即consumer读取数据只能读取到LSO的位置。
LSO并没有持久化某一个位置,而是实时计算出来的,并保存在缓存中。
(2)Absort Index文件
Conusmer发送FetchRequest中,新增了Isolation字段,表示是那种模式
返回数据类型为FetchResponse的格式为:
ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset AbortedTransactions MessageSetSize MessageSet]]
对应各个给字段类型为
设置成 READ_UNCOMMITTED 模式时, the AbortedTransactions array is null.
设置为READ_COMMITTED时,the Last Stable Offset(LSO),当事务提交之后,LSO向前移动offset
数据如下:
存放数据的log
存放Absort Index的内容如下:
执行读取数据流程如下:
step1: 假设consumer读取数据的fetched offsets的区间是0到4。
首先,broker读取data log中数据
然后,broker依次读取abort index的内容,发现LSO大于等于 4 就停止。如上可以获取到P2对应的offset从2到5的消息都是被丢弃的:
最后,broker将上面data log和abort index中满足条件的数据返回给consumer。
step2 :在consumer端根据absrot index中返回的内容,过滤丢弃的消息,最终给用户消息为
4.5.3 Absorted Transaction Index
在broker中数据中新增一个索引文件,保存aborted tranasation对应的offsets,只有事务执行abort时,才会往这个文件新增一个记录,初始这个文件是不存在的,只有第一条abort 时,才会创建这个文件。
这个索引文件结构的每一行结构是TransactionEntry:
当broker接受到控制消息(producer执行commitTransaction()或者abortTransaction())时, 执行如下操作:
(1)计算LSO。
Broker在缓存中维护了所有处于运行状态的事务对应的initial offsets,LSO的值就是这些offsets中最小值-1。
举例说明下LSO的计算,对于一个data log中内如如下
对应的abort index文件中内如如下:LSO是递增的
(2)第二步 如果事务是提交状态,则在索引文件中新增TransactionEntry。
(3)第三步 从active的tranaction set中移除这个transaton,然后更新LSO。
4.5.3 问题
1、问题1:producer通过事务提交消息时抛异常了, 对于使用非事务的消费者,是否可以获取此消息?
对于事务消息,必须是执行commit或者abstort之后,消息才对消费者可见,即使是非事务的消费者。只是非事务消费者相比事务消费者区别,在于可以读取执行了absort的消息。
5 其他思考
1、如何保证消息不丢。
(1)在消费端可以建立一个日志表,和业务处理在一个事务
定时扫描没有表发送没有被处理的消息
(2)消费端,消费消息之后,修改消息表的中消息状态为已处理成功。
2、如何保证消息提交和业务处理在同一个事务内完成
在消费端可以建立一个日志表,和业务处理在一个事务
3、消费者角度,如何保证消息不被重复消费。
(1)通过seek操作
(2)通过kafka事务操作。
4、生产者角度,如何保证消息不重复生产
(1)kakfka幂等性
以上是关于Kafka 事务和幂等详解的主要内容,如果未能解决你的问题,请参考以下文章