学点 Kafka 流处理

Posted 有关SQL

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学点 Kafka 流处理相关的知识,希望对你有一定的参考价值。


我们都知道淘宝,京东!

每个人都会在上面买东西。我双十一偶尔也剁手买个机械键盘。

当然最多的还是在亚马逊买书,一买都是大几百。

现在屋子里都是书,也是大几百本了。


作为程序员的我们,肯定首关注的是成交量。

这些成交量,是老老实实写进数据库的。

那么另外一些“不老实”的数据,是什么,都写到哪里去了?


我在亚马逊买书的时候,通常会关注推荐给我的书。

亚马逊记录了我的浏览,过段时间会塞到我邮箱里一些书单。

我偶尔打开一看,这些书单还真是想看的书,就顺手下单了。


那么我想这些“不老实”的数据,其实就是我的浏览记录。

成千上万的读者在挑书的时候,都会留下浏览记录。

这些数据就通过流式处理,发送给 Hadoop, Hive , Spark 做计算了。

然后我们登录的时候,就收到类似推荐了。强大的推荐系统!


所以我决定研究一番做类似的数据流处理,需要涉及到哪些领域的知识。


今天是第一篇关于流式处理的文章,以 Kafka 为例。

Kafka 是 LinkedIn 开源的消息处理程序。号称可以支撑百万级的请求。


我们先从简单例子开始:


1 简单例子:


1.1 下载安装包: kafka_2.11-1.0.0.tgz


1.2 设置环境变量:$KAFKA_HOME = /opt/kafka; $PATH=$KAFKA_HOME/bin:$PATH


1.3 运行Kafka 服务:


在运行kafka之前,需要先运行或者找到一台 zookeeper 服务器

可以使用 kafka 安装包下面的 zookeeoper 来做实验


运行 zookeeper 服务:

>zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties


运行 kafka 服务:

>kafka-server-start.sh $KAFKA_HOME/config/server.properties


zookeeper 与 kafka 服务进程,都是一个单独的进程。

因此需要在单独的窗口中运行。


当我们启动了这两个服务之后,可以看到进程窗口是一直在不停的诊听客户端动作


kafka-server-start.sh 其实就是启动了一个单机版的伪集群服务进程,即一个Broker.


如果要建立多个broker,那么只要启动多个kafka-server-start.sh即可。

运行多个kafka-server-start.sh进程在同一台机器上,也可以分布在多台机器上。

我们需要一个 kafka 客户端来像 kafka 服务集群发送消息。


1.4 kafka 一个完整的消息传递流程


1.4.1 创建一个 Topic

>kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ConsoleRealTimeMessage

这里, --zookeeper localhost:2181  在 zookeeper.properties 里面定义好了


1.4.2 围绕着新建的 Topic 发送一些消息

>kafka-console-producer.sh --broker-list localhost:9092 --topic ConsoleRealTimeMessage


在接下来的提示中,随意输入一些消息.

这些信息将会被传入到 Kafka cluster 的 broker 中,并且存储起来。


--broker-list localhost:9092

因为我们安装配置 kafka 的方式是单机版的伪集群,所以这里只有一个 broker.

为什么是9092这个端口,而不是其他端口。这个端口可以在配置文件中指定:

$KAFKA_HOME/config/server.properties

默认的端口便是 9092


--topic 指定像哪一个Topic 发送消息。

新发送的消息即将被送到指定的 topic 下面。

topic 有可能被分区,因此新发送的消息根据分区策略转移到相应分区并存储起来。

如果我们指定一个并不存在的 topic , 发送消息该如何处理?

lumatoMacBook-Pro:config lewis$ kafka-console-producer.sh --topic realmessage --broker-list localhost:9092

>mesage from lenis

[2018-02-22 10:32:27,132] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {realmessage=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)


提示的是 partition leader 不可用。

因此在发送消息之前,一定要先建立对应的 topic

虽然如此,但是消息还是能接收的到:

lumatoMacBook-Pro:config lewis$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic realmessage --from-beginning

mesage from lenis


1.4.3 启动一个 Consumer, 并开始接受消息

> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ConsoleRealTimeMessage --from-beginning


--bootstrap-server localhost:9092


--topic 指定从哪个 topic 来接收数据。

如果不指定或者指定一个不存在的topic,则也会报 Topic Partition Leader 不可用的错误


--from-beginning:

如果不指定,则只接收从当前时间开始,之后 producer 产生的消息


1.5 配置一个 Kafka 集群


kafka 集群,本质就是启动多个 kafka broker.

kafka broker 是由这个命令启动的:

>kafka-server-start.sh $KAFKA_HOME/config/server.properties


理论上,只要server.properties配置妥当,

在哪台机器上启动kafka-server-start.sh是没有限制的

因此我们现在本地配置一个简单的单机版集群:


1.5.1 复制多个 server.properties, 命名为


$KAFKA_HOME/config/server1.properties

$KAFKA_HOME/config/server2.properties


1.5.2 编辑这两个新加的配置文件,如下:


server1.properties:

broker.id=1

listeners=PLAINTEXT://:9093

log.dir=/tmp/kafka-logs-1

server2.properties:

broker.id=2

listeners=PLAINTEXT://9094

log.dir=/tmp/kafka-logs-2


1.5.3 分别在两个进程中,调kafka-server-start.sh, 指定不同的配置文件


>kafka-server-start.sh $KAFKA_HOME/config/server1.properties

>kafka-server-start.sh $KAFKA_HOME/config/server2.properties


1.5.4 新建topic, 分发消息,接收消息


当我们是往旧的单机版kafka cluster加入新增的 2 台 kafka broker时

使用的 zookeeper 是同一个机器,所以自动就被设置为同一个cluster

做这么一个假设的前提,是因为我仅仅在两个不同窗口启动了kafka-server-start.sh。配置文件也仅仅是修改了前面的3个项目

而无论我是用localhost:9092, localhost:9093, localhost:9094 发消息

消息始终都可以发到kafka cluster中

并且我无论指向boostrap-server localhost:9092, localhost:9093, localhost:9094

都可以读到发送出来的消息


使用以下命令查看 topic 的分区,replication 信息:

kafka-topics.sh --describe --zookeeper localhost:2181 --topic ConsoleRealTimeMessage


1.5.5 关掉一个kafka broker进程,分发消息,接收消息


假如我们关掉一个 kafka broker:

> ps | grep server1.properties

> kill -9 7553


这时,不能通过这台 kafka broker 来发送消息,也不能通过这台 kafka broker 来接收数据

但是其他两台 kafka broker依然是可以正常工作的


那么这里有个问题了:

既然都可以正常工作,我们设置某一个topic 的replication 数量为 3 ,有什么意义呢?(下篇继续讲)


建立一个有 3 个replica 的 topic:


>kafka-topics.sh --create --zookeeper localhost:2181 --topic ConsoleRealTimeMessageReplica --replication-factor 3 --partitions 1


注意:前面关掉了一个 kafka broker, 所以当 replication factor 超过可用的 kafka broker 数量

就会报错:


Error while executing topic command : Replication factor: 3 larger than available brokers: 2.



1.6 使用 Kafka Connect 来完成导入导出


导入数据到 kafka :


>connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties


第一个参数是配置 kafka broker 的常用配置

第二个参数是connect的connector配置,可以指定数据源是什么。


如果能理解 connect-standalone.sh 的运行,将其中获取数据的代码,套入到自己的程序

就可以实现往kafka cluster传送数据了


数据源的配置文件($KAFKA_HOME/config/connect-file-source.properties)如下:


name=local-file-source

connector.class=FileStreamSource

tasks.max=1

file=/opt/dataexport/hongKongTravel.txt

topic=connect-test


导出数据到目的地:


>connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties


配置文件($KAFKA_HOME/config/connect-file-sink.properties)如下:


name=local-file-sink

connector.class=FileStreamSink

tasks.max=1

file=/opt/dataexport/hongkong.txt

topics=connect-test


当然我们也可以使用 kafka-consumer 连接到 kafka cluster 读取对应的 topic 消息:


kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Connect-Test --from-beginning


发现 Kafka 的官方文档写的非常好。

简单,引人入胜。

通过这么个可用的例子,给了我们最直观的印象。


如果需要套入自己的网站或者 App 应用里面,

就需要使用 Kafka Api 了。

接下来我会用实例来讲解 Kafka 与SQL Server 的集成!




欢迎关注 【有关SQL】



以上是关于学点 Kafka 流处理的主要内容,如果未能解决你的问题,请参考以下文章

分布式流处理服务—Apache Kafka

Kafka python API 是不是支持流处理?

Kafka流处理平台

如何使用kafka流处理块/批处理数据?

Kafka ETL 的应用及架构解析|告别 Kafka Streams,让轻量级流处理更加简单

Kafka ETL 的应用及架构解析|告别 Kafka Streams,让轻量级流处理更加简单