分布式消息队列Kafka使用过程中的那些坑

Posted 高并发架构设计

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式消息队列Kafka使用过程中的那些坑相关的知识,希望对你有一定的参考价值。

问1:怎么保证kafka不丢消息

   

     producer的ack设置。

    设置发送数据是否需要服务端的反馈,有三个值0,1,-1  

   # 0:producer不会等待broker发送ack  

   # 1:当leader接收到消息后发送ack  

   # -1:当所有的follower都同步消息成功后发送ack  

   request.required.acks=0  


问2:怎么保证消息只被消费一次 


    正常读不会重复,如果在上一次读的过程中发生了异常,消息可能被消费,但是offset没有及时commit;这本身是两步,存在中间crash的风险,真要保证消息不能被重复消费的话只能依靠其他逻辑判断当前消息是否被消费过,比如先查一下什么的,不过你真要求那么可靠么?


问3:消费端读到消息先commit还是处理完消息再Comit?


    读完消息先commit消费状态(保存offset)再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息;读完消息先处理再commit消费状态(保存offset)。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。

    根本解决这个问题的话需要本地收到消息持久化消息至本地一个文件or数据库,提交offset->处理消息->清空持久区。如果处理消息时crash,那么后台有守护进程重新把这个处理单元拉起来,看到了临时文件区有上次未处理的消息优先处理掉。这是以放弃效率换来的高可靠性。


问4:我有多个系统需要消费同一条消息,请问该怎么办?


    kafka消息可以通过订阅组来订阅消费,同一个组里只有一个订阅者会收到该消息,这样可以用作负载均衡。

    比如,kafka 中发布:topic = "高并发架构设计" data="MQ消息服务kafka" 这个消息,后面有一百台服务器每台服务器都是一个订阅者,都订阅了这个 topic,但是他们可能分为三组,A组50台,B25台,C组25台。

    A组50台机器由于在同一组,这条消息 (topic = "高并发架构设计" data="MQ消息服务kafka")只会被A组里面一台当前空闲的机器收到。而B组25台服务器用于统计,C组25台服务器用于存档备份,每组只有一台会收到。

    用不同的组来决定每条消息要抄送出多少分去,用同组内哪些订阅者忙,哪些订阅者空闲来决定消息会被分到哪台服务器去处理,生产者消费者模型。


问5:Kafka怎么处理超大文件


    Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?

    针对这个问题,有以下几个建议:

  • 最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。

  • 第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。

  • 第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中通过配置项的方式修改大消息大小。

    如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

  • 性能: kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。

  • 可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数*最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数*最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。

  • 垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。


    Kafka本身是为了高吞吐量而设计的,Kafka比较适合高吞吐量并且允许少量数据丢失的场景,所以一定要根据应用业务和使用场景来做技术选型。一切的一切,都需要在权衡利弊之后,再决定选用哪个最合适的方案。


以上是关于分布式消息队列Kafka使用过程中的那些坑的主要内容,如果未能解决你的问题,请参考以下文章

Kafka

kafka面试题

阿里云消息队列 Kafka-消息检索实践

kafka核心原理的秘密,藏在这16张图里

1.kafka基础架构

kafka分布式消息队列介绍以及集群安装