《Kafka权威指南》摘抄
Posted 敲代码的小小酥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Kafka权威指南》摘抄相关的知识,希望对你有一定的参考价值。
1.根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。
2.在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。
3.Kafka 也支持多个消费者从一个单独的消息流上读取数据,而且消费者之间互不影响。这与其他队列系统不同,其他队列系统的消息一旦被一个客户端读取,其他客户端就无法再读取它。
4.Kafka 有很多配置选项,涉及安装和调优的方方面面。不过大多数调优选项可以使用默认配置,除非你对调优有特别的要求。
5.log.dirs可以配置多个日志路径,如果指定了多个路径,那么 broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
6.num.partitions配置:果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。
7.拥有大量消息的主题如果要进行负载分散,就需要大量的分区。
8.如何选定分区数量
为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要考虑如下几个因素。
主题需要达到多大的吞吐量?例如,是希望每秒钟写入 100KB 还是 1GB ?
从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果你知道消费者将数据写入数据库的速度不会超过每秒 50MB,那么你也该知道,从一个分区读取数据的吞吐量不需要超过每秒 50MB。
可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量。
每个 broker 包含的分区个数、可用的磁盘空间和网络带宽如果消息是按照不同的键来写入分区的,那么为已有的主题新增分区就会很困难。
单个 broker 对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间也越长。
很显然,综合考虑以上几个因素,你需要很多分区,但不能太多。如果你估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区的个数。也就是说,如果每秒钟要从主题上写入和读取 1GB 的数据,并且每个消费者每秒钟可以处理 50MB 的数据,那么至少需要 20 个分区。这样就可以让 20 个消费者同时读取这些分区,从而达到每秒钟 1GB 的吞吐量。
如果不知道这些信息,那么根据经验,把分区的大小限制在 25GB 以内可以得到比较理想的效果。
9.不建议把 Zookeeper 共享给其他应用程序。Kafka 对 Zookeeper 的延迟和超时比较敏感,与 Zookeeper 群组之间的一个通信异常就可能导致 Kafka 服务器出现无法预测的行为。这样很容易让多个 broker 同时离线,如果它们与 Zookeeper 之间断开连接,也会导致分区离线。这也会给集群控制器带来压力,在服务器离线一段时间之后,当控制器尝试关闭一个服务器时,会表现出一些细小的错误。
10.Kafka 还提供了二进制连接协议,也就是说,我们直接向 Kafka 网络端口发送适当的字节序列,就可以实现从 Kafka 读取消息或往 Kafka 写入消息。还有很多用其他语言实现的 Kafka 客户端,比如 C++、 Python、Go 语言等,它们都实现了 Kafka 的连接协议,使得 Kafka 不仅仅局限于在 Java 里使用。这些客户端不属于 Kafka 项目,不过 Kafka 项目 wiki 上提供了一个清单,列出了所有可用的客户端。
11.如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
12.kafka生产者使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
13.生产者batch.size属性:该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。
14.默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息.把 linger.ms设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。
15.在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
16.分配分区是怎样的一个过程
当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor
接口的类来决定哪些分区应该被分配给哪个消费者。
Kafka 内置了两种分配策略,在后面的配置参数小节我们将深入讨论。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
17.一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。
18.poll()方法的参数是一个超时时间,用于控制 poll()
方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0,poll()会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。
理解:消费者取回的数据会存入消费者缓冲区里,等阻塞时间到了后,再返回给轮询列表。
poll()方法有一个超时参数,它指定了方法在多久之后可以返回,不管有没有可用的数据都要返回。超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。
max.poll.records
该属性用于控制单次调用 call()
方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
19.手动提交偏移量之同步提交和异步提交
同步提交:处理完当前批次的消息,在轮询更多的消息之前,调用 commitSync()
方法提交当前批次最新的偏移量。只要没有发生不可恢复的错误,commitSync()
方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里。 broker 对提交请求作出回应之前,应用程序会一直阻塞
异步提交:不会一直重试,它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。
20.一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。
如果一切正常,我们使用 commitAsync()
方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。
如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync()
方法会一直重试,直到提交成功或发生无法恢复的错误。
21.分区再均衡监听器
onPartitionsRevoked方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
onPartitionsAssigned 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。调用 commitSync()
方法,确保在再均衡发生之前提交偏移量
22.复制功能是 Kafka 架构的核心。复制之所以这么关键,是因为它可以在个别节点失效时仍能保证 Kafka 的可用性和持久性。
23.生产请求和获取请求都必须发送给分区的首领副本。如果 broker 收到一个针对特定分区的请求,而该分区的首领在另一个 broker 上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的 broker 上,也会出现同样的错误。Kafka 客户端要自己负责把生产请求和获取请求发送到正确的 broker 上。
那么客户端怎么知道该往哪里发送请求呢?客户端使用了另一种请求类型,也就是元数据请求
。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任意一个 broker,因为所有 broker 都缓存了这些信息。
一般情况下,客户端会把这些信息缓存起来,并直接往目标 broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过 metadata.max.age.ms
参数来配置),从而知道元数据是否发生了变更——比如,在新 broker 加入集群时,部分副本会被移动到新的 broker 上(如图 5-2 所示)。另外,如果客户端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息,之前的请求被发到了错误的 broker 上。
24.消费者客户端还可以指定 broker 最多可以从一个分区里返回多少数据。这个限制是非常重要的,因为客户端需要为 broker 返回的数据分配足够的内存。如果没有这个限制,broker 返回的大量数据有可能耗尽客户端的内存。
25.kafka 使用零复制技术向消费者客户端发送消息——也就是说,Kafka 直接把消息从文件(或者更确切地说是 Linux 文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这是 Kafka 与其他大部分数据库系统不一样的地方,其他数据库在将数据发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制,也不需要管理内存缓冲区,从而获得更好的性能。
26.因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段
。默认情况下,每个片段包含 1GB 或一周的数据,以较小的那个为准。在 broker 往分区写入数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。
当前正在写入数据的片段叫作活跃片段
。活动片段永远不会被删除,所以如果你要保留数据 1 天,但片段里包含了 5 天的数据,那么这些数据会被保留 5 天,因为在片段被关闭之前这些数据无法被删除。
27.Kafka 附带了一个叫 DumpLogSegment 的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。感兴趣的可以自己研究一下。
28.如果只为每个键保留最近的一个消息,那么当需要删除某个特定键所对应的所有消息时,我们该怎么办?这种情况是有可能发生的,比如一个用户不再使用我们的服务,那么完全可以把与这个用户相关的所有信息从系统中删除。
为了彻底把一个键从系统里删除,应用程序必须发送一个包含该键且值为 null
的消息。清理线程发现该消息时,会先进行常规的清理,只保留值为 null
的消息。该消息(被称为墓碑消息
)会被保留一段时间,时间长短是可配置的。在这期间,消费者可以看到这个墓碑消息,并且发现它的值已经被删除。于是,如果消费者往数据库里复制 Kafka 的数据,当它看到这个墓碑消息时,就知道应该要把相关的用户信息从数据库里删除。在这个时间段过后,清理线程会移除这个墓碑消息,这个键也将从 Kafka 分区里消失。重要的是,要留给消费者足够多的时间,让他看到墓碑消息,因为如果消费者离线几个小时并错过了墓碑消息,就看不到这个键,也就不知道它已经从 Kafka 里删除,从而也就不会去删除数据库里的相关数据了。
29.手动提交偏移量,消费者客户端处理消息失败后,后面的消息处理成功,后面的偏移量覆盖失败消息偏移量的解决方案:
第一种模式,在遇到可重试错误时,提交最后一个处理成功的偏移量,然后把还没有处理好的消息保存到缓冲区里(这样下一个轮询就不会把它们覆盖掉),调用消费者的 pause()
方法来确保其他的轮询不会返回数据(不需要担心在重试时缓冲区溢出),在保持轮询的同时尝试重新处理。如果重试成功,或者重试次数达到上限并决定放弃,那么把错误记录下来并丢弃消息,然后调用 resume()方法让消费者继续从轮询里获取新数据。
第二种模式,在遇到可重试错误时,把错误写入一个独立的主题,然后继续。一个独立的消费者群组负责从该主题上读取错误消息,并进行重试,或者使用其中的一个消费者同时从该主题上读取错误消息并进行重试,不过在重试时需要暂停该主题。这种模式有点像其他消息系统里的 dead-letter-queue。
30.长时间处理:
有时候处理数据需要很长时间:你可能会从发生阻塞的外部系统获取信息,或者把数据写到外部系统,或者进行一个非常复杂的计算。要记住,暂停轮询的时间不能超过几秒钟。即使不想获取更多的数据,也要保持轮询,这样客户端才能往 broker 发送心跳。在这种情况下,一种常见的做法是使用一个线程池来处理数据,因为使用多个线程可以进行并行处理,从而加快处理速度。在把数据移交给线程池去处理之后,你就可以暂停消费者,然后保持轮询,但不获取新数据,直到工作线程处理完成。在工作线程处理完成之后,可以让消费者继续获取新数据。因为消费者一直保持轮询,心跳会正常发送,就不会发生再均衡。
31.仅一次传递:
有些应用程序不仅仅需要“至少一次”(at-least-once)语义(意味着没有数据丢失),还需要“仅一次”(exactly-once)语义。尽管 Kafka 现在还不能完全支持仅一次语义,消费者还是有一些办法可以保证 Kafka 里的每个消息只被写到外部系统一次(但不会处理向 Kafka 写入数据时可能出现的重复数据)。
实现仅一次处理最简单且最常用的办法是把结果写到一个支持唯一键的系统里,比如键值存储引擎、关系型数据库、ElasticSearch 或其他数据存储引擎。在这种情况下,要么消息本身包含一个唯一键(通常都是这样),要么使用主题、分区和偏移量的组合来创建唯一键——它们的组合可以唯一标识一个 Kafka 记录。如果你把消息和一个唯一键写入系统,然后碰巧又读到一个相同的消息,只要把原先的键值覆盖掉即可。数据存储引擎会覆盖已经存在的键值对,就像没有出现过重复数据一样。这个模式被叫作幂等性写入,它是一种很常见也很有用的模式。
如果写入消息的系统支持事务,那么就可以使用另一种方法。最简单的是使用关系型数据库,不过 HDFS 里有一些被重新定义过的原子操作也经常用来达到相同的目的。我们把消息和偏移量放在同一个事务里,这样它们就能保持同步。在消费者启动时,它会获取最近处理过的消息偏移量,然后调用 seek()
方法从该偏移量位置继续读取数据。
32.对于消费者来说,最重要的指标是 consumer-lag,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。理想情况下,该指标总是为 0,消费者总能读到最新的消息。不过在实际当中,因为 poll()
方法会返回很多消息,消费者在获取更多数据之前需要花一些时间来处理它们,所以该指标会有些波动。关键是要确保消费者最终会赶上去,而不是越落越远。因为该指标会正常波动,所以在告警系统里配置该指标有一定难度。Burrow 是 LinkedIn 公司开发的一个 consumer-lag 检测工具,它可以让这件事情变得容易一些。
33.在使用 Kafka 构建数据管道时,通常有两种使用场景:第一种,把 Kafka 作为数据管道的两个端点之一,例如,把 Kafka 里的数据移动到 S3 上,或者把 MongoDB 里的数据移动到 Kafka 里;第二种,把 Kafka 作为数据管道两个端点的中间媒介,例如,为了把 Twitter 的数据移动到 ElasticSearch 上,需要先把它们移动到 Kafka 里,再将它们从 Kafka 移动到 ElasticSearch 上。
Kafka 为数据管道带来的主要价值在于,它可以作为数据管道各个数据段之间的大型缓冲区,有效地解耦管道数据的生产者和消费者。Kafka 的解耦能力以及在安全和效率方面的可靠性,使它成为构建数据管道的最佳选择。
以上是关于《Kafka权威指南》摘抄的主要内容,如果未能解决你的问题,请参考以下文章