Kafka快速入门

Posted strongyoung88

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka快速入门相关的知识,希望对你有一定的参考价值。

本文翻译自Kafka官网的Quickstart,主要面向没有使用过kafka、但想快速入门的同学。

Kafka 快速入门

下载

先下载kafka压缩包,并解压。
下载地址:https://kafka.apache.org/downloads
然后解压并切换目录:

> tar -xzf kafka_2.11-1.1.0.tgz
> cd kafka_2.11-1.1.0

启动服务

启动服务包括两部分:Zookeeper服务和kafka服务。

1、启动Zookeeper服务。Kafka使用Zookeeper,所以,如果我们没有Zookeeper服务,我们需要首先启动一个Zookeeper服务 ,我们可以使用kafka自带的脚本包来启动一个单节点的Zookeeper实例,如下:

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

2、启动Kafka服务,建议重新开个终端。如下:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

创建topic

打开一个新的终到,这里,我们创建一个单分区、只有一个副本、名为test的topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在,我们可以运行以下命令来查看topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

另外,除了可以这样手动创建topic之外,我们还可以配置broker来自动创建topic。

发送消息

打开一个新的终端,运行producer(生产者),然后输入一些信息到控制台,以发送信息到server端。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

启动consumer(消费者)

打开一个新的终端,Kafka也可以启动一个命令行consumer来消费消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

因为我们以上的一些启动命令都是在不同的终端下启动,所以,这个时候,我们可以在producer终端里输入一些消息,然后可以在consumer终端里看到在producer里输入的信息。

所有的命令行工具都有额外的配置项,如果运行命令的时候没有加参数,将会显示使用的详细信息。

配置多个broker的集群

到目前为止,我们已经运行了单个broker,但还不够有趣。对于kafka,单个broker就是一个大小为1的集群,因此,相比启动要一个多broker实例,不会有太多的东西要改变。但是,要获得这种感觉,我们要把我们的集群扩大到三个节点(仍然在我们的本地机器)。

首先,我们要对每个broker进行配置一个配置文件,如下:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

现在,编辑这些新的配置文件,如下:
编辑 config/server-1.properties:

    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

编辑 config/server-2.properties:

    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

在同一个集群上,broker.id属性是每个节点唯一的,持久化的名字。当我们运行这些多broker在同一台机器上的时候,我们必须重写端口号和日志目录。

因为我们已经启动了Zookeeper和单个节点的kafka,所以,我们只需要启动两个新的节点:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

现在,我们来创建一个新的topic,这个topic的副本因子为3,名字为my-replicated-topic:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

但是,我们如何能够知道,指定的broker正在做什么呢?我们可以使用“describe topics”命令:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

以下是对输出的解释。第一行给出了所有分区的概况,每个额外的行,给出的是一个分区的信息。因为对于这个topic我们只有一个分区,所以,只有一行。

  • “leader” 它对给出分区所有的读/写都是有责任的。对于一个随机选择的分区部分,每个节点都将会是一个leader。
  • “replicas” 是多个节点的列表,这些节点复制了这个分区的日志,不管它们是不是leader,也不管它们是否还活着。
  • “isr” 是”in-sync”副本的集合。这是副本列表的子集,这个列表当前是活着的,且是leader。

注意:在例子中的节点1是topic的唯一分区的leader

我们可以对我们原先创建的topic运行相同的命令来查看它在哪:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

不要感到惊讶,原先的topic没有副本,而且在server 0 上,当我们创建它之后,集群中只有这一个server。

让我们发布一些消息到我们新的topic上:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

现在,我们可以消费这些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

现在,我们可以测试容错。Broker 1正扮演着leader,所以,我们来杀掉它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows下使用以下命令:

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

领导(leader)已经切换成了slaves中的另外一个,并且,节点1不再是在“in-sync”副本集合中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但是,消息仍然可以被消费,尽管Leader之前有取过数据:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

使用Kafka Connect来导入/导出数据

对于入门来说,从控制台读数据和写回数据到控制是一个很方便的地方,但是,你可能更想从其他源来使用数据,或者从Kafka导出数据到其他系统。例如一些系统,我们可以使用Kafka Connect来导入或导出数据,来代替写自定义的集成代码。

Kafka Connect是一个工具,包括了能够导入和导出数据到Kafka。它是一个扩展工具,这个工具运行实现了用户与外部系统交互逻辑的connectors。在这个quick start中,我们将会看到如何使用简单的connectors来运行Kafka Connect,以此从一个文件导入数据到一个Kafka topic,并且从Kafka topic中导出数据到一个文件。

首先,我们通过创建一些种子数据来测:

> echo -e "foo\\nbar" > test.txt

在Windows系统中使用以下命令:

> echo foo> test.txt
> echo bar>> test.txt

接下来,我们将以standalone模式来启动两个connectors,这意味着他们将运行在单个、本地、专有的进程中。我们提供三个配置文件作为参数。第一个是Kafka connect进程的配置文件,包含公共的配置,例如Kafka brokers连接和数据序列化格式。剩下的配置文件每个指定一个要创建的connector。这些文件包含一个唯一的connector名字,要实例化的connector类,以及connector需要的任意其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

包括Kafka,这些样本配置文件,使用默认的本地集群配置和创建两个connectos:第一个是source connectors,它从一个输入文件读取每一行,且把每一行生成到Kafka topic;第二个是sink connector,它从Kafka topic读取消息,且把每个消息作为一行输出到文件中。

在启动期间,我们将会看到许多日志信息,包括connector将被实例化的指令。一旦kafka connect进程启动后,source connector应该从test.txt中读取数据行,并且生产这些行到topicconnect-test,sink connector应该从topic connect-test读取消息,并且把这些消息写到文件test.sink.txt中。我们可以通过检查输出文件的内容来验证数据被传输:

> more test.sink.txt
foo
bar

注意,这个数据被存储在kafka topicconnect-test中,所以,我们也可以运行一个控制台消费者来查看topic中的数据:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"schema":"type":"string","optional":false,"payload":"foo"
"schema":"type":"string","optional":false,"payload":"bar"
...

connectors继续处理数据,因此,我们可以通过管道往文件中加入数据,然后看它通传输。

> echo Another line>> test.txt

我们应该可以在控制台的消费者的输出上和sink文件中,看到数据的变化。

使用Kafka Streams来处理数据

Kafka Streams是一个客户端库,用于构建严格的任务、实时应用和微服务,它的输入或输出数据存储在Kafka集群上。Kafka Streams使得编写、部署面向客户端的标准Java和Scala应用变得简单,受益于Kafka的服务端集群技术,使得这些应用高可用、弹性、容错、分布式等等。Quickstart例子将描述用这个库如何运行一个流式应用

以上是关于Kafka快速入门的主要内容,如果未能解决你的问题,请参考以下文章

Kafka快速入门

Spark 从 Kafka 读数并发问题

Kafka知识概况

Kafka快速入门

Kafka快速入门——Kafka架构

Kafka快速入门——Kafka架构