kafka--- consumer 消费消息

Posted 芹溪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka--- consumer 消费消息相关的知识,希望对你有一定的参考价值。

1、 consumer API

kafka 提供了两套 consumer API:

1. The high-level Consumer API
2. The SimpleConsumer API

 其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。

1.1 The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多线程的应用,应当注意:

1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

1.2 The SimpleConsumer API

如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

1. 多次读取一个消息
2. 只消费一个 patition 中的部分消息
3. 使用事务来保证一个消息仅被消费一次

 但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁
3. 需要处理 leader 的变更

 使用 SimpleConsumer API 的一般流程如下:

1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
2. 找出每个 partition 的 follower
3. 定义好请求,该请求应该能描述应用程序需要哪些数据
4. fetch 数据
5. 识别 leader 的变化,并对之作出必要的响应

以下针对 high-level Consumer API 进行说明。

2、 consumer group

如 2.2 节所说, kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:

 

图.8

3、 消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

4 、consumer delivery guarantee

如果将 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并非在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee1.读完消息先 commit 再处理消息。    这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once

2.读完消息先处理再 commit。
    这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
3.如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。
    精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能
把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level
API而言,
offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)

总之,Kafka 默认保证 At least once,并且允许通过设置 producer 异步提交来实现 At most once(见文章《kafka consumer防止数据丢失》)。而 Exactly once 要求与外部存储系统协作,幸运的是 kafka 提供的 offset 可以非常直接非常容易得使用这种方式。

更多关于 kafka 传输语义的信息请参考《Message Delivery Semantics》。

5 、consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

1. 将目标 topic 下的所有 partirtion 排序,存于PT
2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
3. N=size(PT)/size(CG),向上取整
4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci

在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

1.Herd effect
  任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance
2.Split Brain
  每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从
zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。 3. 调整结果不可控   所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。

基于以上问题,kafka 设计者考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。(见文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此处不再赘述。

以上是关于kafka--- consumer 消费消息的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-Consumer

kafka 0.10.2 消息消费者

如何增加Spring Kafka Consumer每批消费的消息数?

Kafka Consumer配置——auto.offset.reset如何控制消息消费

Kafka Consumer(消费者组)

troubleshooting记一次Kafka集群重启导致消息重复消费问题处理记录