如何利用pykafka远程消费 zookeeper+kafka集群 python脚本

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何利用pykafka远程消费 zookeeper+kafka集群 python脚本相关的知识,希望对你有一定的参考价值。

#从kafka消费
#consumer_area = topic_area.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)

#从ZOOKEEPER消费
consumer_area = topic_area.get_balanced_consumer(
consumer_group=b'zs_download_04', # 自己命令
auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情况下,设置此变量,表示从最新的开始取
#auto_offset_reset=OffsetType.EARLIEST,
#reset_offset_on_start=True,
auto_commit_enable=True,
#auto_commit_interval_ms=1,
zookeeper_connect=ZK_LIST
)
参考技术A 首先,申明下版本
zookeeper-3.4.7.tar.gz
kafka_2.10-0.8.2.2.tgz 这个一定要用0.8.2X的版本 经过多次实验验证 切记切记

Kafka
version pykafka? rdkafka?
0.8.1 No No
0.8.2 Yes Yes
0.9.0 Planned Planned

zookeeper的配置就不多说了 建议大家安装一个ZKUI 包名zkui-2.0-SNAPSHOT-jar-with-dependencies.jar 配置文件可以找我要啊 网上应该也有

kafka的配置我这里就提及下重点 关于host.name这个参数
如果我们想远程消费这个KAFKA 一定要把这个定义成能访问的IP 比如我想在内网消费这个KAFKA 就要用内网IP绑定
host.name=192.168.0.10

启动zookeeper 和 kafka

flume沿用这个配置
#定义agent的名字为statge_nginx
stage_nginx.sources = S1
stage_nginx.channels = M1
stage_nginx.sinks = sink

#定义source的一些设置
stage_nginx.sources.S1.type = exec
stage_nginx.sources.S1.channels = M1
stage_nginx.sources.S1.command = tail -F /logs/nginx/log/www/info.access.log

#定义sink
stage_nginx.sinks.sink.type = org.apache.flume.sink.kafka.KafkaSink
stage_nginx.sinks.sink.topic = t_nginx
stage_nginx.sinks.sink.brokerList = 172.31.9.125:9091
stage_nginx.sinks.sink.requiredAcks = 0
stage_nginx.sinks.sink.batchSize = 20

stage_nginx.sinks.sink.channel = M1

#定义channel
stage_nginx.channels.M1.type = memory
stage_nginx.channels.M1.capacity = 100

我们消费下nginx的日志

下面是在内网远程消费

直接消费KAKFA
#!/usr/bin/python
# -*- coding:utf-8 -*-

from pykafka import KafkaClient
import logging

client = KafkaClient(hosts="192.168.1.10:9092")

topic = client.topics['t_nginx']

consumer = topic.get_simple_consumer(
consumer_group="simplegroup",
# auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=True
)

for message in consumer:
if message is not None:
print message.offset, message.value

从ZOOKEEPER消费

#!/usr/bin/python
# -*- coding:utf-8 -*-

from pykafka import KafkaClient
import logging

client = KafkaClient(hosts="192.168.1.10:2181,192.168.1.10:2182,192.168.1.10:2183")

topic = client.topics['t_nginx']

balanced_consumer= topic.get_balanced_consumer(
consumer_group='group1',
auto_commit_enable=True,
# reset_offset_on_start=True,
zookeeper_connect='192.168.1.10:2181,192.168.1.10:2182,192.168.1.10:2183'
)

for message in balanced_consumer:
if message is not None:
print message.offset, message.value

其他的操作大家自己改啊 比如分析啊 入库啊 什么的

以上是关于如何利用pykafka远程消费 zookeeper+kafka集群 python脚本的主要内容,如果未能解决你的问题,请参考以下文章

DUBBO分布式RPC远程调用思想

Dubbo的RPC远程过程调用+Dubbo的负载均衡+Zookeeper注册中心

如何调用dubbo注册在zookeeper中的消费者

谈谈我对dubbo的理解(每个阶段理解不同,会持续跟新)

Kafka放弃Zookeeper后如何持存储主题与消费组呢?

SpringBoot-整合Dubbo+Zookeeper