python 消费kafka 写入es 小记
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 消费kafka 写入es 小记相关的知识,希望对你有一定的参考价值。
参考技术A # -*- coding: utf8 -*-# __author__ = '小红帽'
# Date: 2020-05-11
"""Naval Fate.
Usage:
py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>
py_kafka_protobuf_consume.py -h | --help
py_kafka_protobuf_consume.py --version
Options:
-h --help 打印帮助信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--groupId=<groupId> kafka消费组
--topic=<topic_name> topic名称
--es-servers=<host:port> ES 地址
--index=<index_name> ES 索引
--type=<doc> ES type
--id=<order_id> 指定id主键,快速更新
"""
import json
from kafka import KafkaConsumer
from docopt import docopt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Kafka_consumer():
def __init__(self,args):
self.topic = args['--topic']
self.bootstrapServers = args['--bootstrap-servers']
self.groupId = args['--groupId']
self.id = args['--id']
self.es_host = args['--es-servers'].split(':')[0]
self.es_port = args['--es-servers'].split(':')[1]
self.es_index = args['--index']
self.es_type = args['--type']
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrapServers,
group_id=self.groupId,
enable_auto_commit = True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5000
)
def consume_data_es(self):
while True:
try:
es = Elasticsearch(['host': self.es_host, 'port': self.es_port], timeout=3600)
self.consumer.subscribe([self.topic])
actions=[]
for message in self.consumer:
if message is not None:
query = json.loads(message.value)['data'][0]
action =
"_index": self.es_index,
"_type": self.es_type,
"_id": json.loads(message.value)['data'][0][self.id],
"_source": query
actions.append(action)
if len(actions) > 50:
helpers.bulk(client=es, actions=actions)
print("插入es %s 条数据" % len(actions))
actions = []
if len(actions) > 0:
helpers.bulk(client=es, actions=actions)
print("等待超时时间,插入es %s 条数据" % len(actions))
actions=[]
except BaseException as e:
print(e)
if __name__ == '__main__':
arguments = docopt(__doc__,version='sbin 1.0')
consumer = Kafka_consumer(arguments)
consumer.consume_data_es()
以上是关于python 消费kafka 写入es 小记的主要内容,如果未能解决你的问题,请参考以下文章