Kafka快速入门
Posted strongyoung88
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka快速入门相关的知识,希望对你有一定的参考价值。
本文翻译自Kafka官网的Quickstart,主要面向没有使用过kafka、但想快速入门的同学。
Kafka 快速入门
下载
先下载kafka压缩包,并解压。
下载地址:https://kafka.apache.org/downloads
然后解压并切换目录:
> tar -xzf kafka_2.11-1.1.0.tgz
> cd kafka_2.11-1.1.0
启动服务
启动服务包括两部分:Zookeeper服务和kafka服务。
1、启动Zookeeper服务。Kafka使用Zookeeper,所以,如果我们没有Zookeeper服务,我们需要首先启动一个Zookeeper服务 ,我们可以使用kafka自带的脚本包来启动一个单节点的Zookeeper实例,如下:
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
2、启动Kafka服务,建议重新开个终端。如下:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
创建topic
打开一个新的终到,这里,我们创建一个单分区、只有一个副本、名为test的topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
现在,我们可以运行以下命令来查看topic
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
另外,除了可以这样手动创建topic之外,我们还可以配置broker来自动创建topic。
发送消息
打开一个新的终端,运行producer(生产者),然后输入一些信息到控制台,以发送信息到server端。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
启动consumer(消费者)
打开一个新的终端,Kafka也可以启动一个命令行consumer来消费消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
因为我们以上的一些启动命令都是在不同的终端下启动,所以,这个时候,我们可以在producer终端里输入一些消息,然后可以在consumer终端里看到在producer里输入的信息。
所有的命令行工具都有额外的配置项,如果运行命令的时候没有加参数,将会显示使用的详细信息。
配置多个broker的集群
到目前为止,我们已经运行了单个broker,但还不够有趣。对于kafka,单个broker就是一个大小为1的集群,因此,相比启动要一个多broker实例,不会有太多的东西要改变。但是,要获得这种感觉,我们要把我们的集群扩大到三个节点(仍然在我们的本地机器)。
首先,我们要对每个broker进行配置一个配置文件,如下:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
现在,编辑这些新的配置文件,如下:
编辑 config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
编辑 config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
在同一个集群上,broker.id
属性是每个节点唯一的,持久化的名字。当我们运行这些多broker在同一台机器上的时候,我们必须重写端口号和日志目录。
因为我们已经启动了Zookeeper和单个节点的kafka,所以,我们只需要启动两个新的节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
现在,我们来创建一个新的topic,这个topic的副本因子为3,名字为my-replicated-topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
但是,我们如何能够知道,指定的broker正在做什么呢?我们可以使用“describe topics”命令:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
以下是对输出的解释。第一行给出了所有分区的概况,每个额外的行,给出的是一个分区的信息。因为对于这个topic我们只有一个分区,所以,只有一行。
- “leader” 它对给出分区所有的读/写都是有责任的。对于一个随机选择的分区部分,每个节点都将会是一个leader。
- “replicas” 是多个节点的列表,这些节点复制了这个分区的日志,不管它们是不是leader,也不管它们是否还活着。
- “isr” 是”in-sync”副本的集合。这是副本列表的子集,这个列表当前是活着的,且是leader。
注意:在例子中的节点1是topic的唯一分区的leader
我们可以对我们原先创建的topic运行相同的命令来查看它在哪:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
不要感到惊讶,原先的topic没有副本,而且在server 0 上,当我们创建它之后,集群中只有这一个server。
让我们发布一些消息到我们新的topic上:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在,我们可以消费这些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在,我们可以测试容错。Broker 1正扮演着leader,所以,我们来杀掉它:
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
在Windows下使用以下命令:
> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f
领导(leader)已经切换成了slaves中的另外一个,并且,节点1不再是在“in-sync”副本集合中:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是,消息仍然可以被消费,尽管Leader之前有取过数据:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
使用Kafka Connect来导入/导出数据
对于入门来说,从控制台读数据和写回数据到控制是一个很方便的地方,但是,你可能更想从其他源来使用数据,或者从Kafka导出数据到其他系统。例如一些系统,我们可以使用Kafka Connect来导入或导出数据,来代替写自定义的集成代码。
Kafka Connect是一个工具,包括了能够导入和导出数据到Kafka。它是一个扩展工具,这个工具运行实现了用户与外部系统交互逻辑的connectors。在这个quick start中,我们将会看到如何使用简单的connectors来运行Kafka Connect,以此从一个文件导入数据到一个Kafka topic,并且从Kafka topic中导出数据到一个文件。
首先,我们通过创建一些种子数据来测:
> echo -e "foo\\nbar" > test.txt
在Windows系统中使用以下命令:
> echo foo> test.txt
> echo bar>> test.txt
接下来,我们将以standalone模式来启动两个connectors,这意味着他们将运行在单个、本地、专有的进程中。我们提供三个配置文件作为参数。第一个是Kafka connect进程的配置文件,包含公共的配置,例如Kafka brokers连接和数据序列化格式。剩下的配置文件每个指定一个要创建的connector。这些文件包含一个唯一的connector名字,要实例化的connector类,以及connector需要的任意其他配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
包括Kafka,这些样本配置文件,使用默认的本地集群配置和创建两个connectos:第一个是source connectors,它从一个输入文件读取每一行,且把每一行生成到Kafka topic;第二个是sink connector,它从Kafka topic读取消息,且把每个消息作为一行输出到文件中。
在启动期间,我们将会看到许多日志信息,包括connector将被实例化的指令。一旦kafka connect进程启动后,source connector应该从test.txt
中读取数据行,并且生产这些行到topicconnect-test
,sink connector应该从topic connect-test
读取消息,并且把这些消息写到文件test.sink.txt
中。我们可以通过检查输出文件的内容来验证数据被传输:
> more test.sink.txt
foo
bar
注意,这个数据被存储在kafka topicconnect-test
中,所以,我们也可以运行一个控制台消费者来查看topic中的数据:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"schema":"type":"string","optional":false,"payload":"foo"
"schema":"type":"string","optional":false,"payload":"bar"
...
connectors继续处理数据,因此,我们可以通过管道往文件中加入数据,然后看它通传输。
> echo Another line>> test.txt
我们应该可以在控制台的消费者的输出上和sink文件中,看到数据的变化。
使用Kafka Streams来处理数据
Kafka Streams是一个客户端库,用于构建严格的任务、实时应用和微服务,它的输入或输出数据存储在Kafka集群上。Kafka Streams使得编写、部署面向客户端的标准Java和Scala应用变得简单,受益于Kafka的服务端集群技术,使得这些应用高可用、弹性、容错、分布式等等。Quickstart例子将描述用这个库如何运行一个流式应用
以上是关于Kafka快速入门的主要内容,如果未能解决你的问题,请参考以下文章