如何使用python 连接kafka 并获取数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用python 连接kafka 并获取数据相关的知识,希望对你有一定的参考价值。

参考技术A 连接
kafka
的库有两种类型,一种是直接连接
kafka
的,存储
offset
的事情要自己在客户端完成。还有一种是先连接
zookeeper
然后再通过
zookeeper
获取
kafka

brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
来协调。
我现在使用
samsa
这个
highlevel

Producer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
print
msg
Tip
consumer
必需在
producer

kafka

topic
里面提交数据后才能连接,否则会出错。

Kafka
中一个
consumer
需要指定
groupname

groue
中保存着
offset
等信息,新开启一个
group
会从
offset
0
的位置重新开始获取日志。
kafka
的配置参数中有个
partition
,默认是
1
,这个会对数据进行分区,如果多个
consumer
想连接同个
group
就必需要增加
partition
,
partition
只能大于
consumer
的数量,否则多出来的
consumer
将无法获取到数据。

如何使用python 连接kafka 并获取数据

参考技术A 连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。
我现在使用 samsa 这个 highlevel 库
Producer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')

** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg

Tip
consumer 必需在 producer 向 kafka 的 topic 里面提交数据后才能连接,否则会出错。
在 Kafka 中一个 consumer 需要指定 groupname , groue 中保存着 offset 等信息,新开启一个 group 会从 offset 0 的位置重新开始获取日志。
kafka 的配置参数中有个 partition ,默认是 1 ,这个会对数据进行分区,如果多个 consumer 想连接同个 group 就必需要增加 partition , partition 只能大于 consumer 的数量,否则多出来的 consumer 将无法获取到数据。本回答被提问者采纳

以上是关于如何使用python 连接kafka 并获取数据的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 kafka 连接 JDBC sink 和 source 使用 python

如何在kafka-python和confluent-kafka之间做出选择

即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]

python连接kafka生产者,消费者脚本

如何在 Strimzi 中使用 Kafka 连接

Kafka 如何使用 SSL 连接 Elasticsearch?