kafka官方文档中文翻译(kafka参数解释)

Posted 墨痕诉清风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka官方文档中文翻译(kafka参数解释)相关的知识,希望对你有一定的参考价值。

目录

入门

1.1简介

kafka™是一个分布式流媒体平台。这到底意味着什么?

1.2使用案例

1.3快速入门

1.4生态系统

1.5从以前的版本升级

2. API

2.1生产者API

2.2消费者API

2.3 Streams API

2.4连接API

2.5遗留API

3.配置

3.1Broker配置

3.2生产者配置

3.3消费者配置

3.4 Kafka连接配置

3.5 Kafka Streams配置

4.设计

4.1动机

4.2持久性

4.3效率

4.4生产者

4.5消费者

4.6消息传递语义

4.7复制

4.8日志压缩

4.9配额

5.实施

5.1 API设计

5.2网络层

5.3消息

5.4消息格式

5.5 Log

5.6分发

6.操作

6.1基本Kafka操作

6.2数据中心

6.3 Kafka配置

6.4 Java版本

6.5硬件和操作系统

6.6监测

6.7 ZooKeeper

7.安全

7.1安全概述

7.2使用SSL加密和验证

7.3使用SASL进行验证

7.4授权和ACL

7.5在正在运行的集群中合并安全功能

7.6 ZooKeeper认证

8. KAFKA CONNECT

8.1概述

8.2用户指南

8.3连接器开发指南

9.kafka流


入门

1.1简介

kafka™是一个分布式流媒体平台。这到底意味着什么?

我们认为流媒体平台具有三个关键功能:

  1. 它允许您发布和订阅记录流。在这方面,它类似于消息队列或企业消息系统。
  2. 它允许您以容错方式存储记录流。
  3. 它允许您在记录发生的时期处理记录。

Kafka有什么优点?

它用于两大类应用程序:

  1. 构建可靠地在系统或应用程序之间获取数据的实时流数据管道
  2. 构建变换或响应数据流的实时流应用程序

要了解Kafka如何做这些事情,让我们从下而上地研究和探索Kafka的功能。

首先几个概念:

  • Kafka作为一个集群在一个或多个服务器上运行。
  • kafka集群按Topic存储的分类数据流
  • 每个记录由一个键,一个值和一个时间戳组成。

Kafka有四个核心API:

  • 生产者API允许应用程序发布流记录到一个或多个kafka的主题。
  • 消费者API允许应用程序订阅一个或多个主题和处理所产生的对他们的记录流。
  • 流API允许应用程序作为流处理器,从一个消费输入流一个或多个主题,并产生一个输出流到一个或多个输出主题,高效地传输这个输入流到输出流。
  • 连接器API允许构建和运行可重复使用的生产者或消费者连接kafka主题到现有的应用程序或数据系统。例如,关系数据库的连接器可能捕获对表的每个更改。

在kafka客户端和服务器之间的通信以简单的,高性能的,语言无关完成TCP协议。此协议版本化,并保持与旧版本的向后兼容性。我们对kafka提供了一个Java客户端,但是客户端在多种语言中都可以使用。

主题和日志

让我们首先深入Kafka提供的记录流的核心抽象 - 主题。

主题是发布记录的类别或Feed名称。主题在Kafka总是多用户; 也就是说,主题可以具有零个,一个或多个订阅订阅其的数据的消费者。

对于每个主题,Kafka集群维护一个分区日志,如下所示:

每个分区是一个有序的,不可变的记录序列,不断地附加到结构化提交日志。在分区中的记录是所谓每个指派顺序ID号的偏移量唯一地标识该分区中的每个记录。

Kafka集群保留所有已发布的记录,无论它们是否已使用可配置的保留期。例如,如果保留策略设置为两天,则在发布记录后的两天内,可以使用该记录,之后将被丢弃以释放空间。Kafka的性能在数据大小方面是有效的,因此长时间存储数据不是问题。

事实上,每个消费者保留的唯一元数据是消费者在日志中的偏移或位置。这种偏移由消费者控制:通常消费者在读取记录时线性地提前其偏移,但是事实上,由于位置由消费者控制,所以它可以按照喜欢的任何顺序来消费记录。例如,消费者可以重置到较旧的偏移以重新处理来自过去的数据或者跳到最近的记录并开始从“现在”消费。

这些功能的结合意味着Kafka消费者非常便宜 - 他们可以来来去去,对群集或其他消费者没有太大的影响。例如,您可以使用我们的命令行工具“拖动”任何主题的内容,而无需更改任何现有用户使用的内容。

日志中的分区有几个目的。首先,它们允许日志扩展到适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但一个主题可能有许多分区,因此它可以处理任意数量的数据。第二,它们作为并行性的单位 - 更多的是在一点。

分配

日志的分区分布在Kafka集群中的服务器上,每个服务器处理数据并请求共享分区。每个分区都跨越可配置数量的服务器进行复制,以实现容错。

每个分区具有用作“领导者”的一个服务器和充当“跟随者”的零个或多个服务器。领导者处理分区的所有读取和写入请求,而关注者被动地复制领导者。如果领导失败,其中一个追随者将自动成为新的领导者。每个服务器作为其一些分区的领导者和为其他分区的追随者,所以负载在集群内是平衡的。

生产者

生产者将数据发布到他们选择的主题。生产者负责选择哪个记录分配给主题内的哪个分区。这可以以循环方式完成以简单地平衡负载,或者它可以根据一些语义分区函数(例如基于记录中的一些密钥)来完成。更多关于使用分区的一秒钟!

消费者

消费者标榜自己与消费群的名称,并发布到一个话题每个记录每个订阅用户组内传送到一个消费者的实例。消费者实例可以在单独的进程中或在单独的机器上。

如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载平衡。

如果所有消费者实例具有不同的消费者组,则每个记录将被广播到所有消费者进程。

两个服务器Kafka集群托管四个分区(P0-P3)与两个消费者组。消费者组A有两个消费者实例,组B有四个。

然而,更常见的是,我们发现主题具有少量的消费者组,每个“逻辑用户”一个。每个组由用于可伸缩性和容错的许多消费者实例组成。这只是发布 - 订阅语义,其中订户是消费者的集群而不是单个进程。

在Kafka中实现的方式是通过划分消费者实例上的日志中的分区,使得每个实例在任何时间点是分区的“公平共享”的独占消费者。维护组中成员资格的过程由Kafka协议动态处理。如果新实例加入组,则它们将从组的其他成员接管一些分区; 如果实例死机,其分区将分发到其余实例。

卡夫卡只提供了记录的总订单中的一个分区,而不是一个主题的不同分区之间。每分区排序结合按键分区数据的能力对于大多数应用程序是足够的。但是,如果您需要对记录进行总排序,则可以使用只有一个分区的主题来实现,但这将意味着每个用户组只有一个使用者进程。

保证

在高级Kafka提供以下保证:

  • 生产者发送到特定主题分区的消息将按照它们发送的顺序附加。也就是说,如果记录M1由与记录M2相同的生成器发送,并且M1首先发送,则M1将具有比M2更低的偏移并且在日志中较早出现。
  • 消费者实例按记录存储在日志中的顺序查看记录。
  • 对于具有复制因子N的主题,我们将允许最多N-1个服务器故障,而不会丢失提交到日志的任何记录。

有关这些保证的更多详细信息,请参见文档的设计部分。

Kafka作为一个消息系统

Kafka的流概念与传统的企业消息系统相比如何?

消息历来有两种型号:队列发布-订阅。在队列中,消费者池可以从服务器读取并且每个记录去往其中一个; 在发布 - 订阅中,记录被广播给所有消费者。这两种模式都有其优点和缺点。排队的优势在于它允许您划分多个消费者实例上的数据处理,这样可以扩展处理。不幸的是,队列不是多用户 - 一旦一个进程读取它消失的数据。发布订阅允许您将广播数据传送到多个进程,但是没有办法缩放处理,因为每个消息都发送给每个订阅者。

Kafka中的消费者群体概念概括了这两个概念。与队列一样,使用者组允许您对一组进程(消费者组的成员)分配处理。与发布 - 订阅一样,Kafka允许您向多个用户组广播消息。

Kafka的模型的优点是每个主题都有这些属性 - 它可以扩展处理,也是多订户 - 没有必要选择一个或另一个。

Kafka也有比传统的消息传递系统更强的顺序保证。

传统队列在服务器上按顺序保留记录,并且如果多个消费者从队列中消耗,则服务器按照它们被存储的顺序发出记录。然而,尽管服务器按顺序发出记录,但是记录被异步地递送给消费者,因此他们可能在不同的消费者处乱序到达。这有效地意味着在存在并行消耗的情况下记录的排序丢失。消息传递系统通常通过具有仅允许一个进程从队列消费的“独占消费者”的概念来解决这个问题,但是当然这意味着在处理中没有并行性。

卡夫卡做得更好。通过在主题内部具有并行性的概念 - 分区 - ,Kafka能够在消费者进程池上提供排序保证和负载均衡。这通过将主题中的分区分配给消费者组中的消费者来实现,使得每个分区仅由组中的一个消费者消费。通过这样做,我们确保消费者是该分区的唯一读取器,并按顺序消耗数据。由于有许多分区,这仍然平衡许多消费者实例上的负载。但请注意,消费者组中不能有比分区更多的消费者实例。

Kafka作为存储系统

任何允许发布消息从消费消息中分离的消息队列实际上充当了正在传输消息的存储系统。Kafka的不同之处在于它是一个非常好的存储系统。

写入到Kafka的数据写入磁盘并复制以用于容错。Kafka允许生产商等待确认,以便写入被认为是完整的,直到它被完全复制,并保证即使服务器写入失败仍然持续。

磁盘结构Kafka使用Scale-well,Kafka将执行相同的,无论您在服务器上有50 KB或50 TB的持久数据。

由于认真对待存储并允许客户端控制其读取位置,您可以将Kafka视为一种专用于高性能,低延迟提交日志存储,复制和传播的特殊用途分布式文件系统。

用于流处理的Kafka

仅仅读取,写入和存储数据流是不够的,其目的是实现流的实时处理。

在Kafka中,流处理器是从输入主题获取连续数据流,对这个输入执行一些处理,并产生连续数据流到输出主题的任何东西。

例如,零售应用可以接收销售和货物的输入流,并输出从该数据计算出的重新排序和价格调整流。

可以直接使用生产者和消费者API进行简单的处理。然而,对于更复杂的转换卡夫卡提供了一个完全集成的流API。这允许构建执行不重要的处理的应用程序,其计算流的聚合或将流连接在一起。

这个工具帮助解决这种类型的应用面临的硬问题:处理乱序数据,将代码重新处理输入作为代码更改,执行状态计算等。

streams API基于Kafka提供的核心原语:它使用生产者和消费者API用于输入,使用Kafka进行有状态存储,并且在流处理器实例之间使用相同的组机制进行容错。

把碎片放在一起

这种消息传递,存储和流处理的组合可能看起来不寻常,但对于Kafka作为流式传输平台的作用至关重要。

像HDFS这样的分布式文件系统允许存储静态文件以进行批处理。有效像这样的系统允许存储和处理的历史,从过去的数据。

传统的企业邮件系统允许处理在您订阅之后到达的未来邮件。以这种方式构建的应用程序在未来数据到达时处理它。

Kafka结合了这两个功能,这种组合对于Kafka作为流应用程序和流数据流水线的平台至关重要。

通过组合存储和低延迟订阅,流应用程序可以以相同的方式处理过去和未来的数据。这是一个单一的应用程序可以处理历史,存储的数据,而不是结束时,当它到达最后一个记录,它可以保持处理作为未来的数据到达。这是包含批处理以及消息驱动应用程序的流处理的概括概念。

同样,对于流数据流水线,订阅实时事件的组合使得可以将Kafka用于非常低延迟的流水线; 但是可靠地存储数据的能力使得可以将其用于必须保证数据传送的关键数据,或者用于与仅周期性地加载数据的离线系统集成,或者可以长时间地进行维护。流处理设施使得可以在数据到达时变换数据。

有关担保,API和功能的更多信息,卡夫卡提供看剩下的文档

1.2使用案例

这里是对几个受欢迎的用例的Apache Kafka™的描述。对于一些行动,这些领域的概述,看到这个博客帖子

消息

Kafka很好地代替了一个更传统的消息代理。消息代理由于各种原因(用于将处理与数据生成器分离,缓冲未处理的消息等)而使用。与大多数消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和容错功能,使其成为大规模消息处理应用程序的良好解决方案。

在我们的经验中,消息传递使用通常比较低的吞吐量,但可能需要低端到端延迟,并且通常取决于Kafka提供的强大的持久性保证。

在这一领域的卡夫卡媲美传统的邮件系统,如的ActiveMQRabbitMQ的

网站活动跟踪

Kafka的原始用例是能够将用户活动跟踪管道重建为一组实时发布订阅源。这意味着网站活动(网页浏览量,搜索或用户可能采取的其他操作)被发布到中心主题,每个活动类型有一个主题。这些订阅源可用于订阅一系列用例,包括实时处理,实时监控,加载到Hadoop或离线数据仓库系统以进行离线处理和报告。

活动跟踪通常是非常高的量,因为为每个用户页面视图生成许多活动消息。

指标

Kafka通常用于操作监控数据。这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送

日志聚合

许多人使用Kafka替换日志聚合解决方案。日志聚合通常从服务器收集物理日志文件,并将它们放在中央位置(文件服务器或HDFS可能)进行处理。Kafka提取文件的详细信息,并将日志或事件数据更清晰地抽象为消息流。这允许更低延迟的处理和更容易支持多个数据源和分布式数据消耗。与诸如Scribe或Flume的以日志为中心的系统相比,Kafka提供了同样出色的性能,由于复制而提供的更强的持久性保证以及更低的端到端延迟。

流处理

Kafka的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从Kafka主题消费,然后聚合,丰富或以其他方式转换为新主题以供进一步消费或后续处理。例如,用于推荐新闻文章的处理管道可以从RSS订阅源爬行文章内容并将其发布到“文章”主题; 进一步处理可以规范化或去重复该内容并且将经过清洗的文章内容发布到新主题; 最终处理阶段可能会尝试向用户推荐此内容。这样的处理流水线基于各个主题创建实时数据流的图形。在0.10.0.0开始,重量轻,但功能强大的流处理库调用卡夫卡流可在Apache的卡夫卡如上所述进行这样的数据处理。除了卡夫卡流,替代开源流处理工具包括阿帕奇风暴阿帕奇Samza

事件源

事件采购是应用程序的设计风格,其中状态变化会被记录为一个记录时间的有序序列。Kafka对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的极好的后端。

提交日志

Kafka可以作为一种分布式系统的外部提交日志。日志有助于在节点之间复制数据,并作为故障节点恢复其数据的重新同步机制。该日志压实卡夫卡功能有助于支持这种用法。在这种用法卡夫卡类似Apache的会计项目。

1.3快速入门

本教程假设您是新开始的,并且没有现有的Kafka™或ZooKeeper数据。由于卡夫卡控制台脚本是基于Unix和Windows平台上的不同,在Windows平台上使用bin\\windows\\,而不是bin/和修改脚本扩展.bat

步骤1:下载代码

下载的0.10.1.0释放和解压它。

> tar-xzf kafka_2.11-0.10.1.0.tgz 
> cd kafka_2.11-0.10.1.0

步骤2:启动服务器

Kafka使用ZooKeeper,所以你需要首先启动一个ZooKeeper服务器,如果你还没有。您可以使用与kafka一起提供的便利脚本来获取快速清理的单节点ZooKeeper实例。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] Reading configuration from: config / zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
... ...

现在启动Kafka服务器:

>bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties(kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes overidden to 1048576(kafka.utils.VerifiableProperties)
... ...

步骤3:创建主题

让我们使用单个分区和一个副本创建一个名为“test”的主题:

> bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic test

如果我们运行list topic命令,我们现在可以看到这个主题:

> bin/ kafka-topics.sh --list --zookeeper localhost:2181
test

或者,您也可以将经销商配置为在发布不存在的主题时自动创建主题,而不是手动创建主题。

步骤4:发送一些消息

Kafka提供了一个命令行客户端,它将从文件或标准输入接收输入,并将其作为消息发送到Kafka集群。默认情况下,每行都将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
这是一个消息
这是另一条消息

步骤5:启动消费者

Kafka还有一个命令行消费者,将消息转储到标准输出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
这是一条消息
这是另一个消息

如果你有上面的每个命令运行在不同的终端,那么你现在应该能够在生产者终端中键入消息,看到它们出现在消费者终端。

所有命令行工具都有其他选项; 运行没有参数的命令将更详细地显示记录它们的使用信息。

步骤6:设置多代理集群

到目前为止,我们一直在对一个单一的broker,但这没有乐趣。对于Kafka,单个代理只是一个大小为1的集群,因此除了启动几个代理实例之外没有什么变化。但只是为了感觉到它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。

首先我们做一个配置文件为每个broker的(在Windows上使用copy命令来代替):

> cp config/server.properties config/server-1.properties 
> cp config/server.properties config/server-2.properties

现在编辑这些新文件并设置以下属性:

config/server-1.properties:
    broker.id = 1
    listeners = PLAINTEXT://:9093
    log.dir = /tmp/kafka-logs-1

config/server-2.properties:
    broker.id = 2
    listeners = PLAINTEXT://:9094
    log.dir = /tmp/kafka-logs-2

broker.id属性是集群中的每个节点的唯一且永久的名称。我们必须覆盖端口和日志目录,因为我们正在同一台机器上运行这些,我们希望保持所有代理尝试在同一端口注册或覆盖彼此的数据。

我们已经有了Zookeeper和我们的单节点启动,所以我们只需要启动两个新的节点:

> bin/kafka-server-start.sh config/server-1.properties &
... ...
> bin/kafka-server-start.sh config/server-2.properties &
... ...

现在创建一个新的主题,复制因子为3:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,但现在我们有一个集群,我们怎么知道哪个代理正在做什么?要看到运行“describe topics”命令:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
topic:my-replicated-topic partitionCount:1 ReplicationFactor:3 config:
	topic:my-replicated-topic partition:0 leader:1 Replicas:1,2,0 Isr:1,2,0

这里是输出的解释。第一行给出所有分区的摘要,每个附加行给出关于一个分区的信息。因为我们对于这个主题只有一个分区,所以只有一行。

  • “leader”是负责给定分区的所有读取和写入的节点。每个节点将是分区的随机选择部分的领导者。
  • “replicas”是复制此分区的日志的节点的列表,无论它们是否为引导者,或者即使它们当前处于活动状态。
  • “isr”是“同步中”副本的集合。这是副本列表的子集,其当前活动并赶上领导者。

注意,在我的示例中,节点1是主题的唯一分区的leader。

我们可以对我们创建的原始主题运行相同的命令,以查看它在哪里:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
主题:test PartitionCount:1 ReplicationFactor:1配置:
	主题:测试分区:0领导:0副本:0 Isr:0

所以没有什么惊喜 - 原来的主题没有副本,并在服务器0,我们的集群中的唯一的服务器,当我们创建它。

让我们向我们的新主题发布几条消息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
... ...
我的测试消息1 
我的测试消息2 
^ C

现在让我们使用这些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
... ...
我的测试消息1
我的测试消息2
^ C

现在让我们测试容错。broker1担任领导者,所以让我们kill了它:

> ps aux | grep server-1.properties 
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java ...
> kill -9 7564

在Windows上使用:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\\libs\\kafka_2.10-0.10.1.0.jar"  kafka.Kafka config\\server-1.properties    644
> taskkill /pid 644 /f

领导已切换到其中一个从属节点,节点1不再处于同步副本集中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0

但是消息仍然可用于消费,即使采取写入的领导最初是下来:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

步骤7:使用Kafka Connect导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但您可能需要使用来自其他来源的数据或将数据从Kafka导出到其他系统。对于许多系统,您可以使用Kafka Connect来导入或导出数据,而不是编写自定义集成代码。

Kafka Connect是Kafka包含的一个工具,用于向Kafka导入和导出数据。它是运行一个可扩展的工具 连接器,其实现的定制逻辑用于与外部系统交互。在这个快速入门中,我们将看到如何使用简单的连接器来运行Kafka Connect,这些连接器将数据从文件导入Kafka主题,并将数据从Kafka主题导出到文件。

首先,我们将从创建一些种子数据开始测试:

> echo -e "foo\\nbar" > test.txt

接下来,我们将开始两个连接器中运行的独立模式,这意味着他们在一个单一的,本地的,专用的进程中运行。我们提供三个配置文件作为参数。第一个是Kafka Connect过程的配置,包含常见的配置,如要连接的Kafka代理和数据的序列化格式。其余的配置文件均指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kafka包含的这些示例配置文件使用您之前启动的默认本地群集配置,并创建两个连接器:第一个是源连接器,从输入文件读取行并生成每个Kafka主题,第二个是宿连接器它从Kafka主题读取消息,并将其作为输出文件中的一行生成。

在启动期间,您将看到一些日志消息,包括一些指示正在实例化连接器。一旦连接卡夫卡过程已经开始,源连接器应该开始读取行test.txt,并将其生产的话题connect-test,和水槽连接器应该开始从主题读取消息connect-test ,并将其写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个流水线传送:

>cat test.sink.txt
foo
bar

注意,该数据被存储在卡夫卡主题中connect-test,所以我们也可以执行控制台消费者看到主题中的数据(或使用定制消费者的代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"schema":"type":"string","optional":false,"payload":"foo"
"schema":"type":"string","optional":false,"payload":"bar"
...

连接器继续处理数据,因此我们可以将数据添加到文件,并查看它通过管道移动:

> echo "Another line" >> test.txt

您应该看到该行出现在控制台使用者输出和sink文件中。

步骤8:使用Kafka Streams来处理数据

Kafka Streams是Kafka的客户端库,用于实时流处理和分析存储在Kafka代理中的数据。这个快速入门示例将演示如何运行在此库中编码的流应用程序。这里的要点WordCountDemo示例代码(转换为使用,方便阅读的Java 8 lambda表达式)。

KTable wordCounts = textLines
    //将每个文本行(以空格分隔)转换为单词。
    .flatMapValues(value  - > Arrays.asList(value.toLowerCase()。split(“\\\\ W +”)))

    //确保这些字可用作下一个聚合操作的记录键。
    .map((key,value) - > new KeyValue <>(value,value))

    //计算每个字(记录键)的出现次数,并将结果存储到名为“Counts”的表中。
    .countByKey(“Counts”)

它实现WordCount算法,它从输入文本计算字出现直方图。然而,与其它WORDCOUNT例子,你可能已经看到在此之前,上界数据进行操作时,WORDCOUNT演示应用程序的行为稍有不同,因为它的目的是在一个操作无限的,无限的流数据。与有界变量类似,它是一种有状态算法,用于跟踪和更新单词的计数。然而,由于它必须假定潜在的无界输入数据,它将周期性地输出其当前状态和结果,同时继续处理更多的数据,因为它不知道它何时处理了“全部”输入数据。

我们现在将准备输入数据到Kafka主题,随后将由Kafka Streams应用程序处理。

> echo -e "all streams lead to kafka\\nhello kafka streams\\njoin kafka summit" > file-input.txt

或在Windows上:

> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

接下来,我们这个输入的数据发送到输入主题命名流文件输入使用控制台生产者(在实践中,流数据将可能是连续流入卡夫卡其中应用程序将是启动并运行):

> bin/kafka-topics.sh --create \\
            --zookeeper localhost:2181 \\
            --replication-factor 1 \\
            --partitions 1 \\
            --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

我们现在可以运行WordCount演示应用程序来处理输入数据:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

不会有任何STDOUT输出除日志条目,结果不断地写回另一个主题命名为流-单词计数输出卡夫卡。演示将运行几秒钟,然后,不像典型的流处理应用程序,自动终止。

我们现在可以通过从其输出主题中读取来检查WordCount演示应用程序的输出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \\
            --topic streams-wordcount-output \\
            --from-beginning \\
            --formatter kafka.tools.DefaultMessageFormatter \\
            --property print.key=true \\
            --property print.value=true \\
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \\
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

并将以下输出数据打印到控制台:

all     1
lead    1
to      1
hello   1
streams 2
join    1
kafka   3
summit  1

这里,第一列是卡夫卡消息密钥,和第二列是消息值,无论在java.lang.String格式。注意,输出实际上是连续的更新流,其中每个数据记录(即,上面原始输出中的每一行)是单个字(也称为记录密钥,例如“kafka”)的更新计数。对于具有相同键的多个记录,每个稍后的记录是前一个记录的更新。

现在,你可以写更多的输入信息到数据流文件输入主题,并观察加入额外的信息流,单词计数输出的话题,反映了更新的字数(例如,使用上述的控制台生产者和消费者的控制台)。

您可以停止通过控制台消费者按Ctrl-C

1.4生态系统

在主分配之外有很多工具与Kafka集成。的生态系统页列出了许多的这些,包括数据流处理系统,Hadoop的集成,监控和部署工具。

1.5从以前的版本升级

从0.8.x,0.9.x或0.10.0.X升级到0.10.1.0

0.10.1.0有线协议更改。通过遵循以下建议的滚动升级计划,您可以确保在升级期间不会停机。但是,请注意在0.10.1.0潜在的重大更改升级之前。 注意:由于引入了新协议,因此在升级客户端之前升级Kafka集群非常重要(即0.10.1.x客户端仅支持0.10.1.x或更高版本的代理,而0.10.1.x代理也支持旧客户端) 。

滚动升级:

  1. 更新所有broker上的server.properties文件,并添加以下属性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(参见升级后潜在的性能影响对这个配置做什么的详细信息。)
  2. 每次升级一个broker:关闭broker,更新代码,然后重新启动它。
  3. 一旦整个群集升级,通过编辑inter.broker.protocol.version并将其设置为0.10.1.0来覆盖协议版本。
  4. 如果先前的消息格式为0.10.0,请将log.message.format.version更改为0.10.1(这是一个无操作,因为消息格式对于0.10.0和0.10.1都相同)。如果您之前的邮件格式版本低于0.10.0,请不要更改log.message.format.version - 此参数只有在所有用户升级到0.10.0.0或更高版本后才会更改。
  5. 重新启动代理,使新协议版本生效。
  6. 如果此时log.message.format.version仍低于0.10.0,请等待所有用户升级到0.10.0或更高版本,然后将log.message.format.version更改为每个代理上的0.10.1,重新启动它们一个一个。

注意:如果你愿意接受的停机时间,你可以简单地把所有的broker停下来,更新代码,并启动所有的broker。他们将默认从新协议开始。

注:修改协议议版本并且可以做到broker进行升级之后的任何时候重新启动。它不必马上就重启。

潜在重大改变在0.10.1.0

  • 日志保留时间不再基于日志段的上次修改时间。相反,它将基于日志段中的消息的最大时间戳。
  • 日志滚动时间不再取决于日志段创建时间。而是现在基于消息中的时间戳。进一步来说。如果段中第一个消息的时间戳为T,则当新消息的时间戳大于或等于T + log.roll.ms时,日志将被推出
  • 0.10.0的打开文件处理程序将增加约33%,因为为每个段添加时间索引文件。
  • 时间索引和偏移索引共享相同的索引大小配置。因为每个索引条目是偏移索引条目的大小的1.5倍。用户可能需要增加log.index.size.max.bytes以避免潜在的频繁日志滚动。
  • 由于索引文件数量的增加,在一些具有大量日志段(例如> 15K)的代理上,代理启动期间的日志加载过程可能更长。根据我们的实验,将num.recovery.threads.per.data.dir设置为1可以减少日志加载时间。

在0.10.1.0的显著变化

  • 新的Java消费者不再处于测试阶段,我们建议将其用于所有新开发。旧的Scala使用者仍然受支持,但在下一个版本中将被弃用,并将在未来的主要版本中删除。
  • --new-consumer/ --new.consumer开关不再需要使用像MirrorMaker和控制台与消费者新的消费工具; 只需要通过一个Kafka代理连接到而不是ZooKeeper集合。此外,控制台消费者与旧消费者的使用已被弃用,并且将在未来的主要版本中删除。
  • Kafka集群现在可以通过集群ID唯一标识。它将在代理升级到0.10.1.0时自动生成。集群ID可通过kafka.server:type = KafkaServer,name = ClusterId度量标准获得,并且它是元数据响应的一部分。串行器,客户端拦截器和度量记录器可以通过实现ClusterResourceListener接口来接收集群ID。
  • BrokerState“RunningAsController”(值4)已删除。由于一个错误,代理只会在转换出来之前处于这种状态,因此删除的影响应该是最小的。检测给定代理是否是控制器的推荐方法是通过kafka.controller:type = KafkaController,name = ActiveControllerCount指标。
  • 新的Java消费者现在允许用户通过分区上的时间戳搜索偏移量。
  • 新的Java消费者现在支持从后台线程的心跳。有一个新的配置 max.poll.interval.ms,它控制轮询调用之前,消费者会主动离开组(5分钟默认情况下)之间的最大时间。配置的值 request.timeout.ms必须始终大于max.poll.interval.ms,因为这是一个JoinGroup请求可以在服务器上阻塞而消费是平衡的最大时间,因此,我们已经改变其缺省值,以略高于5分钟。最后,的默认值session.timeout.ms已被调整至10秒,的默认值max.poll.records被改变为500。
  • 当使用授权者和用户没有描述在题目授权,则代理将不再返回TOPIC_AUTHORIZATION_FAILED错误的请求,因为这泄漏的主题名称。相反,将返回UNKNOWN_TOPIC_OR_PARTITION错误代码。当使用生产者和消费者时,这可能导致意外的超时或延迟,因为Kafka客户端通常将在未知的主题错误时自动重试。如果您怀疑这可能发生,您应该咨询客户端日志。
  • 提取响应在默认情况下具有大小限制(对于消费者,为50 MB,对于复制为10 MB)。现有的每个分区限制也适用(对于消费者和复制,为1 MB)。注意,这些限制都不是绝对最大值,如下一点所述。
  • 如果找到大于响应/分区大小限制的消息,则消费者和副本可以进行。更具体地,如果提取的第一非空分区中的第一消息大于任一或两个限制,则仍将返回消息。
  • 重载的构造函数中添加kafka.api.FetchRequestkafka.javaapi.FetchRequest允许调用者指定分区的顺序(因为顺序是V3显著)。先前存在的构造函数已被弃用,并且在发送请求之前对分区进行重排以避免饥饿问题。

新协议版本

  • ListOffsetRequest v1支持基于时间戳的精确偏移量搜索。
  • MetadataResponse v2引入了一个新字段:“cluster_id”。
  • FetchRequest v3支持限制响应大小(除了现有的每个分区限制),它返回大于限制的消息,如果需要进行,并且请求中的分区的顺序现在是重要的。
  • JoinGroup v1引入了一个新字段:“rebalance_timeout”。

从0.8.x或0.9.x升级到0.10.0.0

0.10.0.0具有潜在的重大更改(请在升级前检查),并可能 升级后性能的影响。通过遵循以下建议的滚动升级计划,您可以确保在升级期间和之后不会出现停机时间和性能影响。 注意:由于引入了新协议,因此在升级客户端之前升级Kafka集群非常重要。

注释客户提供0.9.0.0版本:由于0.9.0.0引入了一个错误,即依赖于ZooKeeper的客户(老斯卡拉高层次消费者和MirrorMaker如果与老消费者使用)不会与0.10.0.xbroker的工作。因此,0.9.0.0客户应升级到0.9.0.1 之前的broker都升级到0.10.0.x. 对于0.8.X或0.9.0.1客户端,此步骤不是必需的。

滚动升级:

  1. 更新所有代理上的server.properties文件,并添加以下属性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2或0.9.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(参见升级后潜在的性能影响对这个配置做什么的详细信息。)
  2. 升级经纪商。这可以通过简单地把它放下,更新代码,并重新启动代理一次完成。
  3. 一旦整个群集升级,通过编辑inter.broker.protocol.version并将其设置为0.10.0.0来颠覆协议版本。注意:您不应该触摸log.message.format.version - 此参数应该只有更改一旦所有用户已升级到0.10.0.0
  4. 重新启动代理,使新协议版本生效。
  5. 一旦所有消费者已升级到0.10.0,将每个代理上的log.message.format.version更改为0.10.0,然后逐个重新启动它们。

注意:如果你愿意接受的停机时间,你可以简单地把所有的broker下来,更新代码,并开始所有的人。他们将默认从新协议开始。

注:碰碰协议版本并重新启动可以做到任何时候broker进行了升级之后。它不必马上就在。

升级到0.10.0.0后潜在的性能影响

0.10.0中的消息格式包括新的时间戳字段,并使用压缩消息的相对偏移量。磁盘消息格式可以通过server.properties文件中的log.message.format.version进行配置。默认磁盘消息格式为0.10.0。如果客户端客户端的版本低于0.10.0.0,它只能理解0.10.0之前的邮件格式。在这种情况下,代理能够在将响应发送到旧版本上的消费者之前将消息从0.10.0格式转换为较早的格式。但是,在这种情况下,代理不能使用零拷贝传输。Kafka社区关于性能影响的报告显示,在升级后,CPU利用率从20%提高到100%,这迫使所有客户端立即升级,使性能恢复正常。为了避免在消费者升级到0.10.0.0之前进行此类消息转换,可以在将代理升级到0.10.0.0时将log.message.format.version设置为0.8.2或0.9.0。这样,代理仍然可以使用零拷贝传输将数据发送给旧的消费者。一旦消费者升级,就可以在代理上将消息格式更改为0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且可以用于支持尚未更新到较新客户端的一些应用,但是即使在过度配置的群集上也支持所有消费者流量是不切实际的。因此,当代理升级但绝大多数客户没有升级时,尽可能避免消息转换是至关重要的。

对于升级到0.10.0.0的客户端,不会对性能产生影响。

注:通过设置消息格式版本,一是证明现有的所有信息都在或低于该消息格式版本。否则消费者在0.10.0.0之前可能会中断。特别是,在消息格式设置为0.10.0之后,不应将其更改回较早的格式,因为它可能会在0.10.0.0之前的版本上中断用户。

注:由于每个消息中引入的附加时间戳,生产者发送小消息可能会看到,因为增加开销的消息吞吐量下降。同样,复制现在每个消息传输额外的8个字节。如果您运行的集群的网络容量接近,可能会淹没网卡,并看到由于过载的故障和性能问题。

注意:如果已启用压缩对生产者,您可能会注意到降低生产吞吐量和/或降低压缩比在某些情况下,broker。当接收压缩消息时,0.10.0代理避免重新压缩消息,这通常减少延迟并提高吞吐量。然而,在某些情况下,这可以减少生产商的批量化尺寸,这可能导致较差的吞吐量。如果发生这种情况,用户可以调整生产者的linger.ms和batch.size以获得更好的吞吐量。此外,用于压缩具有snappy的消息的生成器缓冲器小于代理使用的生成器缓冲器,这可能对磁盘上的消息的压缩比有负面影响。我们打算在未来的Kafka版本中进行配置。

潜在断裂变化0.10.0.0

  • 从Kafka 0.10.0.0开始,Kafka中的消息格式版本表示为Kafka版本。例如,消息格式0.9.0指的是Kafka 0.9.0支持的最高消息版本。
  • 已经介绍了消息格式0.10.0,并且默认使用它。它包括消息中的时间戳字段和相对偏移量用于压缩消息。
  • 已经引入了ProduceRequest / Response v2,它默认使用支持消息格式0.10.0
  • FetchRequest / Response v2已经被引入,它默认使用支持消息格式0.10.0
  • MessageFormatter接口从改变def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
  • MessageReader接口从改变def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
  • MessageFormatter的包从改变kafka.toolskafka.common
  • MessageReader的包从改变kafka.toolskafka.common
  • MirrorMakerMessageHandler不再公开handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]),因为它从未被调用的方法。
  • 0.7 KafkaMigrationTool不再与Kafka一起打包。如果您需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照记录的升级过程从0.8升级到0.10.0。
  • 新的消费已经标准化,它的API接受java.util.Collection作为方法参数序列类型。可能必须更新现有代码以使用0.10.0客户端库。
  • LZ4压缩消息处理已更改为使用可互操作的成帧规范(LZ4f v1.5.1)。为了保持与旧客户端的兼容性,此更改仅适用于消息格式0.10.0和更高版本。使用v0 / v1(消息格式0.9.0)生成/获取LZ4压缩消息的客户端应继续使用0.9.0成帧实现。使用Produce / Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f成帧。可互操作的LZ4库的列表可在http://www.lz4.org/

在0.10.0.0的显着变化

  • 从卡夫卡0.10.0.0开始,任命了新的客户端库卡夫卡流可用于流处理存储在卡夫卡的主题数据。这个新的客户端库仅适用于0.10.x和向上版本的代理,由于上面提到的消息格式更改。欲了解更多信息,请阅读本节。
  • 配置参数的默认值receive.buffer.bytes现在是64K的新的消费。
  • 新的消费现在公开的配置参数exclude.internal.topics,从意外被列入正则表达式订阅限制内部主题(如消费者偏移主题)。默认情况下,启用。
  • 旧的Scala生产者已被弃用。用户应尽快将其代码迁移到kafka-clients JAR中包括的Java生成器。
  • 新的消费者API已标记为稳定。

从0.8.0,0.8.1.X或0.8.2.X升级到0.9.0.0

0.9.0.0具有潜在的重大更改(请在升级前检查),并从以前的版本中,券商间的协议更改。这意味着升级的代理和客户端可能与旧版本不兼容。在升级客户端之前升级Kafka集群很重要。如果您使用MirrorMaker下游集群,则应首先升级。

滚动升级:

  1. 更新所有代理上的server.properties文件,并添加以下属性:inter.broker.protocol.version = 0.8.2.X
  2. 升级经纪商。这可以通过简单地把它放下,更新代码,并重新启动代理一次完成。
  3. 一旦整个群集升级,通过编辑inter.broker.protocol.version并将其设置为0.9.0.0来颠覆协议版本。
  4. 重新启动代理,使新协议版本生效

注意:如果你愿意接受的停机时间,你可以简单地把所有的broker下来,更新代码,并开始所有的人。他们将默认从新协议开始。

注:碰碰协议版本并重新启动可以做到任何时候broker进行了升级之后。它不必马上就在。

潜在断裂变化在0.9.0.0

  • 不再支持Java 1.6。
  • 不再支持Scala 2.9。
  • 默认情况下,1000以上的代理ID保留为自动分配的代理ID。如果您的群集具有高于该阈值的现有代理ID,请确保相应地增加reserved.broker.max.id代理配置属性。
  • 配置参数replica.lag.max.messages已删除。分区领导在决定哪些副本处于同步时将不再考虑滞后消息的数量。
  • 配置参数replica.lag.time.max.ms现在不仅指自上次从副本获取请求后经过的时间,而且还指自副本上次被捕获以来的时间。仍然从领导者抓取邮件但未赶上replica.lag.time.max.ms中的最新邮件的副本将被认为不同步。
  • 压缩的主题不再接受没有键的消息,如果尝试这种情况,生产者抛出异常。在0.8.x中,没有键的消息将导致日志压缩线程随后抱怨和退出(并停止压缩所有压缩的主题)。
  • MirrorMaker不再支持多个目标集群。因此,它只接受一个--consumer.config参数。要镜像多个源集群,每个源集群至少需要一个MirrorMaker实例,每个源集群都有自己的使用者配置。
  • 下包装工具org.apache.kafka.clients.tools *已移至org.apache.kafka.tools。*。所有包含的脚本仍将照常工作,只有直接导入这些类的自定义代码将受到影响。
  • 在kafka-run-class.sh中更改了默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。
  • kafka-topics.sh脚本(kafka.admin.TopicCommand)现在退出,失败时出现非零退出代码。
  • kafka-topics.sh脚本(kafka.admin.TopicCommand)现在将在主题名称由于使用“。”而导致风险度量标准冲突时打印警告。

    以上是关于kafka官方文档中文翻译(kafka参数解释)的主要内容,如果未能解决你的问题,请参考以下文章

    Apache Kafka官方文档翻译(原创)

    Kafka快速上手(2017.9官方翻译)

    Kafka0.8.2官方文档中文版系列-topic配置参数

    Kafka0.8.2官方文档中文版系列-入门指南

    Kafka介绍

    kafka消息压缩官方说明翻译