如何使用python 连接kafka 并获取数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用python 连接kafka 并获取数据相关的知识,希望对你有一定的参考价值。

参考技术A 连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。
我现在使用 samsa 这个 highlevel 库
Producer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')

** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg

Tip
consumer 必需在 producer 向 kafka 的 topic 里面提交数据后才能连接,否则会出错。
在 Kafka 中一个 consumer 需要指定 groupname , groue 中保存着 offset 等信息,新开启一个 group 会从 offset 0 的位置重新开始获取日志。
kafka 的配置参数中有个 partition ,默认是 1 ,这个会对数据进行分区,如果多个 consumer 想连接同个 group 就必需要增加 partition , partition 只能大于 consumer 的数量,否则多出来的 consumer 将无法获取到数据。本回答被提问者采纳

以上是关于如何使用python 连接kafka 并获取数据的主要内容,如果未能解决你的问题,请参考以下文章

即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]

如何在使用 REST API 创建 Kafka 连接器时定义模式

如何根据连接器名称获取 Kafka 源连接器架构

kafka consumer重新连接后如何获取当前最新数据

如何在kafka-python和confluent-kafka之间做出选择

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示