Kafka消费者总结

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka消费者总结相关的知识,希望对你有一定的参考价值。

参考技术A 之后把KafkaListener的信息封装到MethodKafkaListenerEndpoint,再调用this.registrar.registerEndpoint(endpoint, factory);注册:KafkaListenerEndpointRegistrar实现了BeanFactoryAware, InitializingBean两个接口,因此在Spring初始化Bean的时候会遍历InitializingBean的所有实现类,并执行afterPropertiesSet方法

registerAllEndpoints方法将解析的KafkaListener封装到KafkaListenerEndpointDescriptor,然后注册到list里。registerListenerContainer为每一个KafkaListenerEndpointDescriptor生成一个MessageListenerContainer

KafkaMessageListenerContainer最终继承了Lifecycle,Spring在遍历所有的LifeStyle,执行start方法时KafkaMessageListenerContainer的dostart方法会被调用,实例化了KafkaListenerConsumer对象,ListenerConsumer实现了
Runnable,所以可以放入线程池中,这样可以并发执行,但是这里有个问题,就是getConsumerTaskExecutor如果没有配置线程池,默认的线程池是什么?

执行:
ListenerConsumer实现了Runnable,所以最终由run方法调用的poll()来拉取消息。

总结:
KafkaListener内部也是多线程消费,并且是多线程消费的第一种,一个线程实例化一个KafkaConsumer实例

kafka基础篇——kafka总结

前言

前面我们介绍了kafka的服务端,集群,生产者客户端和消费者客户端。由此我们看到,我们主要是以MQ的方式,对kafka进行的讲解和分析。除了MQ的功能外,kafka还可以用作流式处理等作用。下面我们就总结一下kafka的特点,优点以及适用场景。

一、优点

  • 多生产者和多消费者

  • 基于磁盘的数据存储,换句话说,Kafka 的数据天生就是持久化的。

  • 高伸缩性,Kafka 一开始就被设计成一个具有灵活伸缩性的系统,对在线集群的伸缩丝毫不影响整体系统的可用性。

  • 高性能,结合横向扩展生产者、消费者和 broker,Kafka 可以轻松处理巨大的信息流(LinkedIn 公司每天处理万亿级数据),同时保证亚秒级的消息 延迟。

通过前面的学习我们可以知道,kafka的分布式体现在主题分区的分布式,一个主题的分区可以分布在不同的服务器上,这样设计,对于生产者和消费者而言,都可以同时针对多台服务器进行消息的发送和接收,所以,kafka处理大数据时性能很高。这也是kafka作为MQ而言比其他MQ性能高的原因。

二、kafka数据安全保障总结

kafka服务端:
1.kafka复制机制
采用ISR复制机制:消费者和生产者与分区首领进行对接。分区首领维护这一个副本列表(ISR列表),分区的数据复制到ISR列表的其他副本中,首领中的数据只有复制到所有的ISR列表中的副本中,消息才会设置成commit状态,消费者才能消费该消息。
ISR列表是一个动态维护的列表,当发现有副本挂掉或者与首领数据差太多的话(差多少通过属性设置),会把这个副本剔除ISR列表中,当副本的复制速度跟上首领的数据时,再加入ISR列表。

2.最少同步副本机制
min.insync.replicas设置了最少需要同步的副本数量,来保证我们数据的副本数。当可用的副本数小于这个数字时,kafka不再接收生产者发送的消息,只能给消费者拉取消息,变成了只读状态。通过这种机制,保证各个副本之间的数据同步和安全。

3.不完全首领机制
kafka提供了unclean.leader.election参数,让我们选择完全首领还是不完全首领,我们可以根据自身业务需求,从数据安全和时间成本中,去选择哪种模式。

kafka生产者客户端:
1.acks设置:设置为all,保证副本之间数据的同步。
2.消息的确认机制:发送忘记、同步发送确认、异步发送确认机制。

kafka消费者客户端:
1.enable.auto.commit设置为false,手动提交偏移量
2.分区再均衡监听器的使用,保证偏移量的提交以及分区再均衡后的正确拉取偏移量的位置

三、kafka高性能设置

1.硬件配置
2.集群中broker数量和主题中分区数量以及集群的副本数。

四、kafka配置总结

kafka很多方面通过配置可以提高性能,现在总结如下:

kafka服务端参数配置:

  1. 消息吞吐量设置

message.max.bytes:表示一个服务器能够接收处理的单个消息的最大字节数,注意这个值 producer 和 consumer 必须设置一致。这个值设置成10K性能是最佳的。

  1. 过期机制设置

log.retention.bytes、log.retention.hours、log.segment.bytes三个参数配合使用:
log.retention.bytes是分区日志文件大小限制,设置成-1,意思是文件可以无限大。
log.retention.hours是分区日志时间保存期限,根据我们的需求设置期限。
log.segment.bytes是每个分区日志文件的大小限制,超过后,会自动新建一个分区日志文件。
这三个参数配合使用,使日志文件按照时间进行过期,且分批进行过期。

  1. 顺序性保证设置
    max.in.flight.requests.per.connection设置为1。意思是当一个生产者进行重试时,其他生产者的消息不写入日志文件。

  2. ISR列表维护机制
    replica.lag.max.messages:设置副本数据与首领相差多少,就剔除ISR列表。 0.9.0.0版本后移除了replica.lag.max.messages参数。因为当大数据量瞬间写入kafka时,可能其他副本都没有第一时间同步那么多数据,然后被剔除了ISR列表。等副本同步完首领数据后,又加入了ISR列表,这么反复的剔除和加入,有问题,所以这个参数后面版本剔除了。
    replica.lag.time.max.ms:设置其他副本与首领不同步的时间,超过这个时间,剔除ISR列表。即使在峰值流量下,生产者往 leader 发送大量的消息,除非副本始终和 leader 保持 replica.lag.time.max.ms 时间的落后,否则它不会随机进出 ISR。

  3. 最少同步副本机制
    min.insync.replicas参数:设置kafka最少需要同步数据的副本数量。

  4. 完全首领机制与不完全首领机制的切换
    unclean.leader.election参数默认为true,支持不完全首领机制。我们可以根据实际业务切换机制。

五、kafka高性能设计思想

1.最少使用原则的应用:
消息的分发,分区首领的选择,都采用了最少使用原则的思想,将工作量均衡的分散到各个broker之间,提高了工作效率。

2.零拷贝技术

3.磁盘顺序写入

4.页缓存

以上是关于Kafka消费者总结的主要内容,如果未能解决你的问题,请参考以下文章

kafka学习总结012 --- 数据消费相关流程

kafka 分区分配及再平衡总结

Kafka知识总结

kafka基础篇——kafka总结

kafka总结

kafka相关知识点总结