消息队列之-Kafka原理讲解

Posted 亦非我所愿丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列之-Kafka原理讲解相关的知识,希望对你有一定的参考价值。

文章目录




一、什么是消息队列

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削峰等问题

实现高性能、高可用,可伸缩和最终一致性架构

使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ、MetaMQ


二、消息队列的四种场景介绍

以下介绍消息队列在实际应用中常用使用场景。异步处理,应用解偶,流量削锋和消息通讯四个场景

2.1 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种

  • (1)串行方式
  • (2)并行方式

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

假设三个业务节点每个使用50毫秒,不考虑网络等其他开销,则串行方式是150毫秒,并行的时间可能是100毫秒

因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)

小结:如以上案例描述,传统的方式系统的性能(并发量、吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度也很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍


2.2 应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图

传统模式的缺点:

  • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
  • 订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.3 流量削峰

流量削峰也是消息队列中的常用场景,一般在秒杀或团购活动中使用广泛

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

  • 可以控制活动的人数
  • 可以缓解时间内高流量压垮应用

  • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
  • 秒杀业务根据消息队列中的请求信息,再做后续处理

2.4 日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下

  • 日志采集客户端,负责日志数据采集,定时写入Kafka队列
  • Kafka消息队列,负责日志数据的接收,存储和转发
  • 日志处理应用,订阅并消费kafka队列中的日志数据

具体可查看新浪kafka日志处理应用案例:转自(http://cloud.51cto.com/art/201507/484338.htm)



二、什么是Kafka

kafka 是一个分布式的流媒体平台

apache kafka是分布式订阅、发布、消息传递的系统和强大的队列,可以处理大量数据,并是你能够将数据从一个终端传递到另一个终端

kafka适用于离线和在线消息消费

kafka消息被保留在磁盘上,并在集群内复制以防止数据丢失

kafka建立在zookeeper同步服务器之上

它与Apache Storm和Spark完美结合,实时流式传输数据分析


流媒体平台有三个关键功能

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统内
  • 以容错的持久方式存储记录流
  • 记录发生时处理流

Kafka通常用于两大类应能用

  • 构建可在系统或应用程序之间可靠获取数据的实时数据流管道
  • 构建转换或响应数据流的实时应用程序

要了解kafka如何做这些事情,我们深入讨论kafka的能力


首先是几个概念:

  • kafka作为一个集群运行在一个或多个可跨多个数据中心的服务器上(broker)
  • kafka集群以称为主题的类别存储记录流(topic)
  • 每条记录都包含一个键,一个值和一个时间戳

Kafka有四个核心API

  • producer API允许应用程序发布的记录流至一个或多个kafka的topic
  • consumer API允许应用程序订阅一个或多个topic,并处理所产生的对它们记录的数据流
  • stream API允许应用程序充当stream处理器,从一个或多个topic消耗的输入流,并产生一个输出流至一个或多个输出的topic,有效地变换输出流至输出流
  • connect API允许构建和运行kafka topic连接到现有的应用程序或数据系统中重用producer和consumer。例如,关系数据库的连接器可能捕获对表的每个更改

在kafka中,客户端和服务器之间的通信是通过简单,高性能,语言无关的TCP协议完成的。此协议已版本化并保持与旧版本的向后兼容性。kafka客户端有多种语言版本



三、Kafka 内部原理

3.1 Broker

broker是运行在kafka集群中的服务,kafka集群中可以有多个broker节点,一般节点数量为n/2+1,以保证容错性


3.2 Topic and Log

首先深入讨论Kafka为记录流提供的核心抽象 - topic

topic 是发布记录的类别或订阅源名称。Kafka的topic总是多用户的;也就是说,一个 topic 可以有零个,一个或多个consumer订阅写入它的数据。

对于每个topic,Kafka集群都维护一个如下所示的partition日志:

每一个partition都是一个有序的,不可变的记录序列,不断附加到结构化的commit log中。分区中的记录每个都分配了一个称为偏移的顺序ID号,它唯一的标识分区中的每个记录。

Kafka集群持久保存所有已发布的记录,无论是否已使用,使用可配置的保留期。例如,如果保留策略设置为2天,则在发布后的2天内,它可供使用,之后将被丢弃以释放空间。Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。

实际上,基于每个consumer保留的唯一元数据是该consumer在日志中的偏移或位置。这种偏移由consumer控制:通常consumer在读取记录时会线性地提高其offset,但事实上,由于该位置由consumer控制,因此它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为较旧的offset来重新处理过去的数据,或者跳到最近的记录并从"现在"开始消费。


3.3 Partition

partition是物理上面的概念,每一个topic都将包含一个或多个partition

日志的partition分布在Kafka集群中的服务器上,每个服务器处理数据和请求共享partition。每个partition都在可配置数量的服务器上进行复制,以实现容错。

每个分区都有一个服务器充当"Leader",0个或多个服务器充当"followers"。leader处理分区的所有读取和写入请求,而followers被动地复制leader。如果leader失败,其中一个粉丝将自动成为新的leader。每个服务器都充当其某些分区的leader和其他服务器的followers,因此负载在集群中得到很好的平衡。


3.4 Producer

producer将数据发布到它们选择的topic。producer负责选择分配给topic中哪个分区的记录。这可以以循环方式完成,仅仅是为了平衡负载,或者可以根据一些语义区分功能(例如基于记录中的某些键)来完成。

消息的存放位置:客户端可以控制消息发送到哪个partition,有两种,一种是随机的;一种是通过自定义函数指定partition


producer load balancing

producer将数据直接发送到作为topic partition leader的代理,而不需要任何中间路由层。为了帮助producer执行此操作,所有kafka节点都可以回答有关哪些服务器处于活动的元数据请求以及topic partition的leader在任何给定时间的位置,以允许producer适当地指定其请求。

客户端控制它将消息发布到哪个partition,这可以随机完成,实现一种随机负载平衡,或者可以通过一些语义分区函数来实现。通过允许用户指定partition键,并使用这个键对分区进行散列(如果需要,还可以选择重写partition函数),我们公开了用于语义分区的接口。例如,如果选择的密钥是用户ID,那么给定用户的所有数据将被发送到同一个partition。这反过来将允许consumer对他们的消费做出地方假设。这种风格的划分是明确设计的,以允许在消费者的地方敏感的处理。


3.5 Consumer

consumer从broker拉取消息

Kafka consumer通过像broker发布"获取"请求来领导它想要消费的分区。使用者在每个请求的日志中指定其offset,并从该位置开始接收一块日志。因此,consumer可以对该位置进行重新控制,并且如果需要,可以将其倒回以重新消耗数据。

kafka也有两种模式,队列模式 (Queue) 和发布-订阅模式 (Pub)

  • 队列模式:consumer可以同时争夺消息,但是只有一个consumer消费
  • 发布-订阅模式:消息会被广播到consumers中,多个consumer可以加入一个consumer group,以组的身份共同竞争一个topic,topic的消息会被订阅的consumer group内的一个consumer消费

consumer需要维护消费的状态:消息是否被消费了,是在consumer这边记录的,这样的对不会集群和其他consumer产生影响,非常轻量级

1、每一个consumer都是有一个consumer group的,不指定则归属默认的consumer group下
2、发布订阅模式中,若所有的consumer都在一个consumer group中,那就是队列模式的效果
3、消费状态是根据offset (偏移量) 来确定的,类似于数组的下标。凭借offset,我们可以随心所欲的消费


3.6 Replicas

Kafka允许topic的每个partition拥有若干个副本,这个数量是可以配置的,你可以为每个topic partition配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。

Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

创建副本的单位是topic的partition,每个partition都有一个leader和零个或多个followers。所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers像普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否"活跃"有着清晰的定义。


Kafka判断一个节点是否活跃有两个条件

1、节点必须可以维护和zookeeper的连接,zookeeper通过心跳机制检查每个节点的连接(通过zookeeper的心跳机制)
2、如果节点是个follower,它必须能及时的同步leader的写操作,延时不能太久

Leader会跟踪所有的"同步中"的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是"太久",是由参数 replica.lag.time.max.ms决定的

只有当消息被所有的副本加入到日志中时,才算是"committed",只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。producer也可以选择是否等待消息被提交的通知,这个是由参数request.required.acks决定的。Kafka保证只要有一个"同步中"的节点,"committed"的消息就不会丢失


3.7 ISR

Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才会通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在zookeeper中维护。ISR中由f+1个节点,就可以允许在f个节点down掉的情况下不会丢失任何消息并正常提供服务。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到"同步中"的状态时,它可以重新加入ISR。这种leader的选举方式是非常快速的,适合kafka的应用场景。

如果所有节点都down掉了怎么办?kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一点所有节点都down了,这个就不能保证了。


3.8 Consumer Position

令人惊讶的是,跟踪已消耗的信息是消息传递系统的关键性能点之一。

大多数消息传递系统都保留关于在broker上消耗了哪些消息的元数据。也就是说,当消息被分发给consumer时,broker要么立即在本地记录该事实,要么等待来自consumer的确认。这是一个相当直观的选择,而且对于一个单一的机器服务器来说,不清楚这个状态还能去向何处。由于在许多消息传递系统中用于存储的数据结构伸缩性很差,因此这也是一种实用的选择——因为broker知道消耗了什么,所以可以立即删除它,从而保持数据大小较小。

也许不显而易见的是,让broker和consumer就所消费的东西达成一致并不是一个小问题。如果broker在每次通过网络分发消息时都立即记录所消耗的消息,那么如果consumer未能处理该消息(例如因为它崩溃或请求超时或其他原因),则该消息将丢失。为了解决这个问题,许多消息传递系统添加了确认特性,这意味着消息在发送时只标记为已发送,而不被使用;broker等待来自consumer的特定确认来记录消息为已使用。这一策略解决了丢失消息的问题,但产生了新的问题。首先,如果consumer处理该消息,但是在发送确认之前失败,则该消息将被消耗两次。第二个问题是关于性能的,现在broker必须对每个消息保持多个状态(首先锁定它,这样就不会第二次发出,然后将其标记为永久消耗,这样就可以删除它)。棘手的问题必须处理,比如如何处理发送的消息,但从未被确认。

kafka的处理方式不同。我们的topic被划分为一组完全有序的partition,每个partition在任何给定时间都由每个订阅consumer group内的一个consumer使用。这意味着每个partition中的consumer的位置只是一个整数,是下一个要消费的消息的offset。这使得状态被消耗的非常小,每个partition只有一个数字。这种状态可以定期检查。这使得等效的消息确认非常便宜。

这个决定有一个附带的好处。consumer可以故意重返旧的offset并重新消费数据。这违反了一个队列的共同合同,但最终成为许多消费者的基本特征。例如,如果使用者代码有一个bug,并且在一些消息被使用之后被发现,那么使用者可以在bug被修复之后重新使用这些消息。


3.9 Message Delivery Semantics

现在我们已经了解了一些producer和consumer是如何工作的,让我们讨论一下Kafka在生产者和消费者之间提供的语义保证。显然,可以提供多个可能的消息传递保证:

  • 最多一次:一旦消息可能丢失,但永远不会重发。
  • 至少一次:至少一次消息永远不会丢失,但可能会被重发。
  • 完全一次:确切地说,这是人们真正想要的,每一条消息一次只传递一次。

值得注意的是,这分为两个问题:发布消息的持久性保证和消耗消息时的保证。

许多系统声称提供"完全一次"的交付语义,但重要的是阅读细则,大多数这些说法具有误导性(即它们不会转化为消费者或生产者可能失败的情况,有多个情况消费者进程,或写入磁盘的数据可能丢失的情况)。

kafka的语义学是直截了当的。在发布消息时,我们有一个消息"committed"到日志的概念。一旦提交了已发布的消息,只要复制此消息所写入的分区的broker保持"alive"状态,就不会丢失它。提交消息的定义、活动分区以及我们试图处理哪种类型的故障的描述将在下一节中更详细地描述。现在让我们假设一个完美的,无损的broker,试图了解生产者和消费者的保证。如果生产者试图发布消息并遇到网络错误,则无法确定此错误是否发生在消息提交之前或之后。这类似于用自动生成的关键字插入到数据库表中的语义。

在0.11.0.0之前,如果生产者未能接收到指示提交消息的响应,则它别无选择,只能重新发送消息。这提供了至少一次传递语义,因为如果原始请求实际上已经成功,则在重发期间可以将消息再次写入日志。从0.11.0.0开始,Kafka生产者还支持幂等传递选项,该选项保证重发不会导致日志中的重复条目。为了实现这一点,broker为每个生产者分配一个ID,并使用生产者与每个消息一起发送的序列号来删除重复消息。同样从0.11.0.0开始,生产者支持使用类似于事务的语义向多个topic partition发送消息的能力:即,要么所有消息都被成功写入,要么没有消息被写入。主要的用例是kafka topic之间的一次处理(如下所述)。

并非所有的用例都需要这样强有力的保证。对于延迟敏感的使用,我们允许生产者指定它所期望的耐久性水平。如果生产者指定它希望等待提交的消息,则可以采取10ms的顺序。然而,生产者也可以指定它希望完全异步地执行发送,或者它只想等到领导者(但不一定是追随者)有消息

现在让我们从consumer的角度来描述语义学。所有replicas具有完全相同的log,具有相同的offset。consumer控制它在这个log中的位置。如果使用者从未崩溃,那么它只能把这个位置存储在内存中,但是如果使用者失败了,并且我们希望这个topic partition被另一个进程接管,那么新的进程将需要选择一个适当的位置来开始处理。假设consumer阅读一些消息——它有几个选项来处理消息和更新它的位置。

  1. 它可以读取消息,然后保存其在日志中的位置,并最终处理消息。在这种情况下,有可能使用者进程在保存其位置之后但在保存其消息处理的输出之前崩溃。在这种情况下,接管处理的进程将从保存的位置开始,即使该位置之前的一些消息尚未被处理。这对应于"最多一次"语义,如在消费者失败消息的情况下可能不被处理。
  2. 它可以读取消息,处理消息,并最终保存其位置。在这种情况下,消费者进程在处理消息之后但在保存其位置之前有可能崩溃。在这种情况下,当新进程接管时,它接收的前几条消息就已经被处理了。这对应于在消费者失败的情况下的"至少一次"语义。在许多情况下,消息具有主键,因此更新是等幂的(两次接收相同的消息只是用自身的另一个replicas覆盖记录)。它可以读取消息,处理消息,并最终保存其位置。在这种情况下,consumer进程在处理消息之后但在保存其位置之前有可能崩溃。在这种情况下,当新进程接管时,它接收的前几条消息就已经被处理了。这对应于在消费者失败的情况下的"至少一次"语义。在许多情况下,消息具有主键,因此更新是等幂的(两次接收相同的消息只是用自身的另一个replicas覆盖记录)。

那么,确切地说一次语义(即你真正想要的东西)呢?当从一个Kafka topic消费并生产到另一个topic(如在Kafka Streams应用程序中)时,我们可以利用上面提到的0.11.0.0中的新事务生产者功能。使用者的位置作为消息存储在一个topic中,因此我们可以在与接收处理数据的输出topic相同的事务中将offset写入Kafka。如果事务被中止,则使用者的位置将恢复到其旧值,并且根据其他使用者的“隔离级别”,输出topic上生成的数据对其他使用者不可见。如果它们是已中止的事务的一部分,但是在“read_.”中,使用者将只从已提交的事务(以及不属于事务的任何消息)返回消息。

当写入外部系统时,限制在于需要将消费者的位置与实际存储为输出的位置进行协调。实现这一点的经典方法是在消费者位置的存储和消费者输出的存储之间引入两阶段提交。但是,通过让消费者将其偏移量存储在与输出相同的位置,可以更简单、更一般地处理这一问题。这是更好的,因为许多消费者可能想要写入的输出系统将不支持两阶段提交。作为示例,考虑一个Kafka Connect连接器,它填充HDFS中的数据以及它所读取的数据的偏移量,从而保证数据和偏移量都更新或者都不更新。对于许多其他的数据系统,我们遵循类似的模式,这些模式需要这些更强的语义,并且消息没有允许重复的主要键。

因此,Kafka有效地支持Kafka Streams中的精确一次交付,并且事务性生产者/消费者通常可用于在Kafka主题之间传输和处理数据时提供精确一次交付。其他目的地系统的精确一次交付通常需要与此类系统合作,但是Kafka提供了偏移量,这使得实现此是可行的(也参见Kafka Connect)。否则,Kafka在默认情况下保证至少进行一次传递,并且允许用户通过禁用对生产者的重试并在处理一批消息之前在消费者中提交偏移来实现最多一次传递。


3.10 Ack机制

Kafka保证消息的送达,提供了三种ACK级别
1、当ack=0,这意味着producer发送出消息即送达,不等待服务器的确认。这种情况下数据传输效率最高,但是数据可靠性确是最低的
2、当ack=1,表示producer写leader成功,broker就返回成功,没有等待所有followers写入成功(默认)
3、当ack=-1时,producer需要等待ISR中的所有副本都确认接收到数据后才算一次完成,可靠性最高。但是这样也不能100%保证数据不丢失,比如当ISR中只有一个leader时(ISR中的成员由于某些情况会增加也会减少,可能最少就只剩一个leader),这样就变成了ack=1的情况

补充:

(1)当ack=1,一旦有某个broker宕机导致Partition Follow和leader切换,可能会导致丢数据
(2)如果一定要高可靠性,则设置ack=-1且同时设置最小副本数大于1
(3)若不满足ACK,produc会抛出异常 NotEnoughReplicas or NotEnoughReplicasAfterAppend



四、Kafka 配置参数

4.1 Broker Configure

(1)listeners 地址

listeners=PLAINTEXT://10.80.227.169:9092

(2)集群ID
每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。如果未设置,则会生成唯一的代理标识。为避免zookeeper生成的代理标识与用户配置的代理标识之间的冲突,生成的代理标识从 reserved.broker.max.id + 1开始。

broker.id = -1 (default)

(3)broker 最大消息

broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起

message.max.bytes = 1000012 (default)

(4)request timeout ms

在向producer发送ack之前,broker允许等待的最大时间,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功) ,客户端将在必要时重新发送请求,或者如果重试耗尽,则请求失败

request.timeout.ms = 30000 (default)

4.2 Topic Configure

(1)Topic 默认分区

新建Topic时默认的分区数

num.partitions	= 1 (default)

(2)自动创建topic

是否允许自动创建topic。如果设为true,那么produce,consume或者fetch metadata一个不存在的topic时,就会自动创建一个默认replication factor和partition number的topic。

auto.create.topics.enable = false (default => true)

(3)删除topic

启用删除topic。如果关闭此配置,则通过管理工具删除topic将不起作用
注意:kafka-manager 删除topic依赖这个配置

delete.topic.enable = true (default)

(4)Topic Partition 平衡 Leader

每当broker停止或崩溃领导时,该broker的topic partition转移到其他replicas。这意味着默认情况下,当broker重新启动时,它将只是所有partition的follower,这意味着它不会用于客户端读取和写入。

为了避免这种不平衡,kafka有一个首选复制品的概念。如果partition的replicas列表是1,5,9,则节点1首选作为节点5或9的leader,因为它在replicas列表中较早。你可以让Kafka群集通过运行命令尝试恢复已恢复replicas的leader:

参数一:如果设为true,复制控制器会周期性的自动尝试,为所有的broker的每个partition平衡leadership,为更优先(preferred)的replica分配leadership
参数二:partition rebalance 检查的频率,由控制器触发,默认为300秒
参数三:每个broker允许的不平衡的leader的百分比。如果每个broker超过了这个百分比,复制控制器会对分区进行重新的平衡。该值以百分比形式指定,默认为10%

auto.leader.rebalance.enable = true (default)
leader.imbalance.check.interval.seconds = 300	(default)
leader.imbalance.per.broker.percentage	= 10 (default)

4.3 Thread Configure

(1)后台任务处理线程数

一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改

background.threads = 10 (default)

(2)I/O 线程

server端处理请求时的I/O线程的数量,不要小于磁盘的数量。

num.io.threads = 8 (default)

(3)queue max request

I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。这个参数是指定用于缓存网络请求的队列的最大容量,这个队列达到上限之后将不再接收新请求。一般不会成为瓶颈点,除非I/O性能太差,这时需要配合num.io.threads等配置一同进行调整

queued.max.requests = 500 (default)

(4)网络线程

服务器用于接收来自网络的请求并向网络发送响应的线程数

num.io.threads = 3 (default)

(5)数据目录线程

每个数据目录的线程数,用于启动时的日志恢复和关闭时的刷新

num.recovery.threads.per.data.dir	= 1 (default)

(6)日志目录移动副本线程

num.replica.alter.log.dirs.threads = null (default)

(7)leader replicas线程数

用来从leader复制消息的线程数量,增大这个值可以增加follower的I/O并行度。

num.replica.fetchers = 1  (default)

4.4 Compress Configure

(1)压缩类型

producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好

为topic指定一个压缩类型,此配置接受标准压缩编码(‘gzip’,‘snappy’,‘lz4’),另外接受’uncompressed’相当于不压缩, ‘producer’ 意味着压缩类型由producer指定

compression.type = producer (default)

4.5 Log Configure

(1)日志路径

log.dirs = /tmp/kafka-logs (defaults)

(2)日志持久化

参数一:在将消息刷新到磁盘之前,在日志分区上累积的消息数量。强制fsync一个分区的log文件之前暂存的消息数量。因为磁盘IO操作是一个慢操作,但又是一个“数据可靠性”的必要手段,所以检查是否需要固化到硬盘的时间间隔。需要在“数据可靠性”与“性能”之间做必要的权衡,如果此值过大,将会导致每次“fsync”的时间过长(IO阻塞),如果此值过小,将会导致”fsync“的次数较多,这也就意味着整体的client请求有一定的延迟,物理server故障,将会导致没有fsync的消息丢失。通常建议使用replication来确保持久性,而不是依靠单机上的fsync

参数二:任何主题中的消息在刷新到磁盘之前都保留在内存中的最长时间(以毫秒为单位)。 如果未设置,则使用log.flush.scheduler.interval.ms中的值

参数三:记录上次把日志刷到磁盘的时间点的频率,用来日后的恢复。通常不需要改变。

参数四:更新记录起始偏移量的持续记录的频率

log.flush.interval.messages = 9223372036854775807 (default)
log.flush.interval.ms = null (default)
log.flush.offset.checkpoint.interval.ms = 60000 (default)
log.flush.start.offset.checkpoint.interval.ms = 60000 (default)

(3)日志清理策略

默认情况下启用日志清理程序。这将启动更干净的线程池。要在特定主题上启用日志清理,您可以添加特定于日志的属性

可选择delete或compact,如果设置为delete,当log segment文件的大小达到上限,或者roll时间达到上限,文件将会被删除。如果设置成compact,则此文件会被清理,标记成已过时状态,详见 4.8 。此配置可以被覆盖。

log.cleanup.policy = delete (default)

日志保留的最小时间,因为时定时检查的,所以不是精确时间(单位:小时)

log.retention.hours = 168 (default)

日志达到删除大小的阈值。每个topic下每个分区保存数据的最大文件大小;注意,这是每个分区的上限,因此这个数值乘以分区的个数就是每个topic保存的数据总量。同时注意:如果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。注意,这项设置可以由每个topic设置时进行覆盖。-1为不限制。

log.retention.bytes = -1 (default)

(4)压缩日志清理策略
启用日志清理器进程在服务器上运行。使用了cleanup.policy = compact的主题,包括内部offsets主题,都应该启动该选项。如果被禁用的话,这些topic将不会被压缩,并且会不断增长。

log.cleaner.enable = true (default)

对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。

log.cleaner.delete.retention.ms = 86400000 (default)

(5)分区segment设置
topic 分区的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的,这就是段文件(segment file);一个topic的一个分区对应的所有segment文件称为log。这个设置控制着一个segment文件的最大的大小,如果超过了此大小,就会生成一个新的segment文件。此配置可以被覆盖。

log.segment.bytes = 1073741824	(default)

这个设置会强制Kafka去新建一个新的log segment文件,即使当前使用的segment文件的大小还没有超过log.segment.bytes。此配置可以被覆盖

log.roll.hours = 168

每个log segment的最大尺寸。注意,如果log尺寸达到这个数值,即使尺寸没有超过log.segment.bytes限制,也需要产生新的log segment。

log.index.size.max.bytes = 10485760 (default)

检查日志段文件的间隔时间,以确定是否文件属性是否到达删除要求(单位:毫秒)

log.retention.check.interval.ms = 300000 (default)

4.6 Offset Configure

(1)offset commit ack
可以接受consumer提交之前所需的ack。通常,不应覆盖默认值(-1)

offsets.commit.required.acks = -1

(2)offset commit timeout
consumer 提交的延迟时间,offsets提交将被延迟,直到偏移量topic的所有副本都收到提交或达到此超时。 这与producer请求超时类似。

offsets.commit.timeout.ms	= 5000  (default)

(3)offset load buffer size
每次从offset段文件往缓存加载offsets数据时的读取的数据大小。默认5M

offsets.load.buffer.size = 5242880	(default)

(4)offset retention check interval
用于定期检查offset过期数据的检查周期

offsets.retention.check.interval.ms = 600000	(default)

(5)offsets retention minutes
如果一个group在这个时间没没有提交offsets,则会清理这个group的offset数据

offsets.retention.minutes	= 10080 (default)

(6)offset topic compression codec
offsets.topic.compression.codec仅适用于内部偏移主题(即__consumer_offsets)。对这些用户级topic没用

offsets.topic.compression.codec	= 0 (default)

(7)offset topic num partition
offsets topic的分区数量(部署后不应更改)

offsets.topic.num.partitions = 50 (default)

(8)offset topic replcas factor
offset topic的replicas数量(设置更高以确保可用性)。在集群大小满足此复制因子要求之前(例如set为3,而broker不足3个),内部topic创建将失败

offsets.topic.replication.factor = 3 (default)

(9)offset topic segment bytes
为了便于更快的日志压缩和缓存加载,offset的topic segment字节应保持相对较小

offsets.topic.segment.bytes = 104857600 (default)

4.7 Quota Configure

(1)quota consumer default

只有在动态默认配额没有配置或者为Zookeeper时才使用。 如果一个消费者每秒消费的数据量大于此值,则暂时不会再允许消费。0.9版本新加(单位为bytes)

quota.consumer.default = 9223372036854775807 (default)

(2)quota producer default

只有在动态默认配额没有配置或者为Zookeeper时才使用。如果一个生产者每秒产生的数据大于此值,则暂时会推迟接受生产者数据

quota.producer.default = 9223372036854775807	(default)

4.8 Replica Configure

(1)replica fetch min bytes
复制数据过程中,replica收到的每个fetch响应,期望的最小的字节数,如果没有收到足够的字节数,就会等待期望更多的数据,直到达到replica.fetch.wait.max.ms

replica.fetch.min.bytes = 1 (default)

(2)replica fetch wait max ms
follower replicas 发布的每个fetcher请求的最长等待时间。此值应始终小与 replica.log.time.max.ms,以防止针对低吞吐量topic频繁收缩ISR

replica.fetch.wait.max.ms = 500

(3)replica lag time max ms
如果一个follower在这个时间内没有发送fetch请求,leader将从ISR重移除这个follower,并认为这个follower已经挂了

replica.lag.time.max.ms = 10000

(4)replica high watermark checkpoint interval ms
每一个replica follower存储自己的high watermark到磁盘的频率,用来日后的recovery。

replica.high.watermark.checkpoint.interval.ms = 5000

(5)replica socket receive buffer bytes
复制数据过程中,follower发送网络请求给leader的socket receiver buffer的大小。

replica.socket.receive.buffer.bytes = 65536 (default)

(6)replica socket timeout ms
leader 备份数据时的socket网络请求的超时时间,它的值至少应该是replica.fetch.wait.max.ms

replica.socket.timeout.ms = 30000 (default)

(7)副本写入保证

min.insync.replicas	= 1 (defaults)

当生产者将ack设置为"all"(或"-1")时,min.insync.replicas指定必须确认写入被认为成功的最小副本数(必须确认每一个replicas的写数据都是成功的)。 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当一起使用时,min.insync.replicas和acks允许您强制更大的耐久性保证。 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并且生产者使用“all”选项。 这将确保如果大多数副本没有写入生产者则抛出异常。

(8)unclean leader election enable
指明了是否能够使不在ISR中replicas follower设置用来作为leader

unclean.leader.election.enable = false (default)

4.9 Socket Configure

(1)socket request max bytes
server能接受的请求的最大的大小,这是为了防止server跑光内存,不能大于Java堆的大小

socket.request.max.bytes = 104857600	(default)

(2)socket send buffer bytes
server端用来处理socket连接的SO_SNDBUFF缓冲大小。如果值为-1,则将使用操作系统默认值。

socket.send.buffer.bytes = 102400

4.10 Transacton Configure

(1)transaction max timeout ms
事务的最大允许超时时间。 如果客户请求的事务时间超过这个时间,那么broker将在InitProducerIdRequest中返回一个错误。 这样可以防止客户超时时间过长,从而阻碍消费者读取事务中包含的主题

transaction.max.timeout.ms = 900000 (default)

(2)transaction state log load buffer size
将生产者ID和事务加载到缓存中时,从事务日志段(the transaction log segments)读取的批量大小。

transaction.state.log.load.buffer.size = 5242880 (default)

(3)transaction state log min isr
覆盖事务主题的min.insync.replicas配置,在min.insync.replicas中,replicas数量为1,该参数将默认replicas定义为2

transaction.state.log.min.isr = 2 (default)

(4)transaction state log num partitions
事务主题的分区数量(部署后不应更改)。

transaction.state.log.num.partitions = 50 (default)

(5)transaction state log replication factor
事务主题的复制因子(设置更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求

transaction.state.log.replication.factor = 3 (default)

(6)transaction.state.log.segment.bytes
事务主题段字节应保持相对较小,以便更快地进行日志压缩和缓存加载

transaction.state.log.segment.bytes = 104857600 (default)

(7)transactional id expiration ms
事务协调器在未收到任何事务状态更新之前,主动设置生产者的事务标识为过期之前将等待的最长时间(以毫秒为单位)

transactional.id.expiration.ms	= 604800000 (default)

4.11 Zookeeper Configure

(1)zookeeper 连接地址

zookeeper.connect=10.80.227.169:2181,10.80.238.7:2181,10.81.55.98:2181

(2)zookeeper connection timeout

连接到ZK server的超时时间,没有配置就使用 zookeeper.session.timeout.ms

zookeeper.connection.timeout.ms = null (default)

(3)zookeeper session timeout ms

Zookeeper会话超时时间

zookeeper.session.timeout.ms = 6000 (default)

(4)zookeeper max in flight requests

此KIP建议调用一个新的代理配置zookeeper.max.in.flight.requests ,它代表客户端在阻塞之前将发送给ZooKeeper的最大未确认请求数。

此配置必须至少设置为1。

默认值设置为10.我们运行实验,显示zookeeper.max.in.flight.requests 各种ZooKeeper密集型控制器协议对完成时间的影响。选择默认值是实验结果发现收益递减的最小数字。

zookeeper.max.in.flight.requests = 10 (default)

(5)zookeeper set acl

连接zookeeper是否使用ACLs安全验证

zookeeper.set.acl = false (default)

4.12 Other Configure

(1)优雅的关机

Kafka群集将自动检测任何代理关闭或故障,并为该计算机上的partition选择新的leader。无论服务器是故障还是故意关闭以进行维护或配置更改,都会发生这种情况。对于后一种情况,Kafka支持更优雅的机制来停止服务器,而不仅仅是杀死服务器。当服务器正常停止时,它有两个优化,它将利用:
它会将所有日志同步到磁盘,以避免在重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。日志恢复需要时间,因此加速了故意重启。
在关闭之前,它会将服务器所leader的任何partition迁移到其他replicas。这将使leader转移更快,并将每个partition不可用的时间缩短到几毫秒。

每当服务器停止而不是硬杀死时,将自动同步日志,但受控leader迁移需要使用特殊设置:

controlled.shutdown.enable = true (default)

请注意,只有在代理上托管的所有分区都具有replicas(即复制因子大于1 且至少其中一个副本处于活动状态)时,受控关闭才会成功。这通常是你想要的,因为关闭最后一个replicas会使该topic partition不可用。

(2)consumer metadata size

consumer保留offset信息的最大空间大小

offset.metadata.max.bytes = 4096


五、Kafka Python Client

5.1 安装 kafka-python

下载地址:https://github.com/dpkp/kafka-python


5.2 kafka python producer
#!/usr/local/bin/python3.6
# -*- coding:utf-8 -*-

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

import json

class Kafka_producer():
    ''' 使用kafka-python的producer模块 '''

    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkaTopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers = 'kafka_host:kafka_port'.format(
            kafka_host = self.kafkaHost,
            kafka_port = self.kafkaPort
            ))

    def sendjsondata(self, params):
        try:
            params_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkaTopic, params_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print(e)

def main_producer():
    ''' 测试producer '''
    producer = Kafka_producer("10.80.227.169", 9092, "wanglei")
    for id in range(10):
        params = 'abtest:%s' % str(id)
        producer.sendjsondata(params)

if __name__ == '__main__':
    main_producer()

5.2 kafka python consumer
#!/usr/local/bin/python3.6
# -*- coding:utf-8 -*-

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

import json

class Kafka_consumer():
    '''  使用kafka-python的consumer模块 '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkaTopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkaTopic, group_id = self.groupid, bootstrap_servers = 'kafka_host:kafka_port'.format(
            kafka_host = self.kafkaHost,
            kafka_port = self.kafkaPort
        ))

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt as e:
            print(e)

def main_consumer():
    ''' 测试consumer '''
    consumer = Kafka_consumer("10.80.227.169", 9092, "wanglei", "test-python-ranktest")
    message = consumer.consume_data()
    for i in message:
        print(i)


if __name__ == '__main__':
    main_consumer()


六、Kafka-manager

kafka manager 用于管理 apache kafka


6.1 说明
  • 管理多个集群
  • 检查集群状态(topic,consumer,offset,broker,partition,replica)
  • 首选replica选举
  • 使用选项partition分配以选择要使用的broker
  • partition重分配(基于生成的分配)
  • 基于可选的topic配置进行创建
  • 删除topic
  • topic list及查看删除的topic
  • 为已存在的topic增加partition
  • 为已存在的topic更新配置
  • 批量生成多个topic的partition分配,并可选择要使用的broker
  • 批量运行重新分配多个topic的partition
  • 将partition添加到现有topic
  • 更新现在topic的配置
  • (可选)为代理级别和主题级别度量标准启用JMX轮询。

6.2 项目地址

https://github.com/yahoo/kafka-manager


6.3 要求
  • kafka 0.8 +
  • Java 8+

6.4 安装kafka-manager

(1)安装 sbt

curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
mv bintray-sbt-rpm.repo /etc/yum.repos.d/
yum -y install sbt

(2)安装 kafka-manager

git clone https://github.com/yahoo/kafka-manager.git
cd kafka-manager
sbt clean dist

(3)解压缩并启动

说明:如果kafka-manager需要开启jmx,则必须在kafka中添加JMX_PORT配置

cd /usr/local/kafka-manager/target/universal
unzip kafka-manager-1.3.3.21.zip
cd /usr/local/kafka-manager/target/universal/kafka-manager-1.3.3.21
./bin/kafka-manager -Dconfig.file=/usr/local/kafka-manager/conf/application.conf  2>&1 &

(4)kafka-manager 配置

play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"
play.crypto.secret=$?APPLICATION_SECRET
play.i18n.langs=["en"]
play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader
kafka-manager.zkhosts="10.80.227.169:2181,10.80.238.7:2181,10.81.55.98:2181"
kafka-manager.zkhosts=$?ZK_HOSTS
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
akka 
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"

akka.logger-startup-timeout = 60s
basicAuthentication.enabled=false
basicAuthentication.enabled=$?KAFKA_MANAGER_AUTH_ENABLED
basicAuthentication.username="admin"
basicAuthentication.username=$?KAFKA_MANAGER_USERNAME
basicAuthentication.password="password"
basicAuthentication.password=$?KAFKA_MANAGER_PASSWORD
basicAuthentication.realm="Kafka-Manager"
basicAuthentication.excluded=["/api/health"] # ping the health of your instance without authentification
kafka-manager.consumer.properties.file=$?CONSUMER_PROPERTIES_FILE

(5)kafka-manager管理界面



六、Kafka集群

https://blog.csdn.net/wanglei_storage/article/details/82759963



七、文章及案例