FlinkFLink 写入kafka 中关于 Exactly-Once 的一些思考

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFLink 写入kafka 中关于 Exactly-Once 的一些思考相关的知识,希望对你有一定的参考价值。

1.概述

首先看看文章:【Flink】介绍Flink中状态一致性的保证

根据文章内容化,我们知道kafka写写入是2阶段提交。2阶段提交看起来挺令人迷惑的,其实就是分2中情况嘛。

1.1 sink带事务

带事务的sink端,一般都mysql,Oracle,Kafka等。

比如kafka操作,我们一般事务代码如下

 public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.186:9092");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, "3");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

        producer.initTransactions();

        try {
            producer.beginTransaction();
            for (int i = 0; i < 5; i++) {
                Future<RecordMetadata> send = producer
                        .send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
                System.out.println(send.get().offset());
                TimeUnit.MILLISECONDS.sleep(1000L);
            }
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            producer.abortTransaction();
        }
        producer.close();
    }

可以看到核心就是

 producer.initTransactions();
try {
     producer.beginTransaction();
     ProducerRecord aa = new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))Future<RecordMetadata> send = producer.send(aa);
     producer.commitTransaction();

 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();
}

这段代码只不过是在2阶段提交的时候做了一些包装罢了,没有什么特别深奥的。


在上面这一步就相当于做了

 producer.initTransactions();
try {
     producer.beginTransaction();
     ProducerRecord aa = new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))Future<RecordMetadata> send = producer.send(aa);

这些操作,其实已经将数据写入到目标的组件了,但是有没有真正写入topic这个需要看对方怎么实现的。

然后这一步当所有的都完成了,那么才提交事务,其实就是做了提交事务操作

     producer.commitTransaction();

然后如果出问题了,那么也仅仅是取消事务

 catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }

其他支持事务的操作大概一样的。

1.1.1 flink事务

假设flink要支持事务,你该怎么搞?

1.1.1.1 state

假设是这种,flink消费一批数据后,没有立即发送数据,而且在sink端缓存了一下,然后在做checkpoint的时候 将本次的state全保存起来,放到hdfs中了,那么就意味着持久化了,然后将本批次发送出去,然后如果运气好,一次就发送成功,如下


事务成功后,就可以删除引用这个state的指针,释放掉,不重复发送了。

如果运气不好,那么就会重复发送,这样就会出现这种情况


可以看到这种情况下,数据重复了,如果sink到的组件没有去重功能数据就是重复了,但是如果对方做了数据去重功能,那么就是可以达到幂等性,就是相当于发送了一次。

这样的比如

  1. mysql的主键,第二次发送肯定会主键冲突

  2. ES的version,肯定会直接忽略第二次发送 参考:【elasticsearch】 基于_version进行乐观锁并发控制 完美知识又串联在一起了。

  3. kafka的数据压缩去重

1.1.1.2 flink不处理

上面可以看到如果数据攒一批然后发送可能造成sink端的数据重复,或者实时性不够,那么这种就是直接发送。

这种情况下,flink中每次来了一个数据都发送给对方,然后对方不直接放到要写入的topic,只是放到一个临时的topic,然后到了一段时间,checkpoin进行的时候,就将本次的state保存起来,然后提交事务。

这里一看就有几个问题。

  1. 假设提交事务的时候,挂了,导致没有提交成功,那么可以再次提交,让kafka将数据写入到真正的topic
  2. 如果提交一半失败了,类似如下方案

    这种情况下,再次提交肯定会出现数据重复了,需要存储组件做幂等性去重存储。
  3. 如果flink提交的时候,任务挂了呢?那么就需要从state中恢复,恢复的时候,看到有个引用的事务没有提交,那么需要再次提交。

1.2 sink端不带事务

sink端不带事务的比如hdfs,文件之类的。

1.2.1 临时文件&临时topic

如果是Kafka上面说了一种临时的topic方案。

或者可以是临时文件。比如要写入到kafka可以写一个
假设kafka保存的文件是

topic/index-0-100.txt

我们可以写一个文件是存储事务的如下

topic/index-100-200.tx

然后提交事务的时候,只需要将文件改个名字就好了,非常原子性。

1.1.2 预写日志

这种也比较常见,就是存储端将数据先写入到一个临时文件,然后自己在提交事务的时候,将数据真正写入。

2. kafka事务

因为还没了解够kafka事务,然后根据 【kafka】 kafka 2.3 版本 生产者和消费者事务 案例 找个案例结论。

  • 默认的普通消费者设置的是read_uncommitted 可以读取到未提交的事务数据。
  • 消费者设置read_uncommitted 可以读取到未提交的事务数据。
  • 消费者设置read_committed 只有在生产者提交事务的时候,才可以读取到数据,如果事务取消了,那么读取不到数据。

我猜测了另外一种实现。

kafka的事务数据是直接写入到真正topic的,但是也将事务信息写入到内置事务topic中了。

真正topic  分区   当前offset   事务结束offset
topic_lcc  1     100         200

事务topic   当前事务ID    事务元数据
事务topic   1001        分区1 start_offset=100 endOffset=200

然后

  • 当事务提交的时候,清除这个事务信息
  • 当事务还没提交的时候,如果设置的是read_uncommitted,那么假设消费者读取到100了,然后下次读取的时候,直接读取100-200,这样就读取到没提交的事务了。
  • 如果事务回滚了,那么,如果设置的是read_committed,那么假设消费者读取到100了,然后下次读取的时候,先看下事务是否提交,如果没有提交,那么就阻塞或者直接跳过100-200的数据,或者读取后缓存起来,这个处理应该比较麻烦,万一这个事务一直不提交怎么办,有很多问题。
  • 如果事务回滚了,那么,如果设置的是read_committed,那么假设消费者读取到100了,然后下次读取的时候,先看下事务是否提交,如果回滚了,那么就阻塞或者直接跳过100-200的数据,这个最简单。

2. 小结

可以看到如果想要真正的 Exactly-Once 需要source-> sink->外部组件,三个一起努力完成。

以上是关于FlinkFLink 写入kafka 中关于 Exactly-Once 的一些思考的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink kafka producer 分区策略 (flink写入数据不均匀 与 数据写入 分区无数据 )

FlinkFlink 写入 kafka 报错 The server disconnected before a response was received

FlinkFlink Kafka 消费卡死 消费组卡死 topic无写入 实际有数据 topic正常

FlinkFlink 写入 kafka 报错 Failed to send data to Kafka: Expiring 4 record(s) for 20001 ms has passed(代码

FlinkFlink 小知识点 Flink 同时 保存 offset 到backend 和 kafka 内置 topic

FlinkFlink 1.14.0 全新的 Kafka Connector