python 消费kafka 写入es 小记

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 消费kafka 写入es 小记相关的知识,希望对你有一定的参考价值。

参考技术A # -*- coding: utf8 -*-

# __author__ = '小红帽'

# Date: 2020-05-11

"""Naval Fate.

Usage:

        py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>

        py_kafka_protobuf_consume.py -h | --help

        py_kafka_protobuf_consume.py --version

Options:

        -h --help                                      打印帮助信息.

        --bootstrap_servers=<host:port,host2:port2..>  kafka servers

        --groupId=<groupId>                            kafka消费组

        --topic=<topic_name>                            topic名称

        --es-servers=<host:port>                        ES 地址

        --index=<index_name>                            ES 索引

        --type=<doc> ES type

        --id=<order_id> 指定id主键,快速更新

"""

import json

from kafka import KafkaConsumer

from docopt import docopt

from elasticsearch import Elasticsearch

from elasticsearch import helpers

class Kafka_consumer():

    def __init__(self,args):

        self.topic = args['--topic']

        self.bootstrapServers = args['--bootstrap-servers']

        self.groupId = args['--groupId']

        self.id = args['--id']

        self.es_host = args['--es-servers'].split(':')[0]

        self.es_port = args['--es-servers'].split(':')[1]

        self.es_index = args['--index']

        self.es_type = args['--type']

        self.consumer = KafkaConsumer(

            bootstrap_servers=self.bootstrapServers,

            group_id=self.groupId,

            enable_auto_commit = True,

            auto_commit_interval_ms=5000,

            consumer_timeout_ms=5000

        )

    def consume_data_es(self):

        while True:

            try:

                es = Elasticsearch(['host': self.es_host, 'port': self.es_port], timeout=3600)

                self.consumer.subscribe([self.topic])

                actions=[]

                for message in self.consumer:

                    if message is not None:

                        query = json.loads(message.value)['data'][0]

                        action =

                            "_index": self.es_index,

                            "_type": self.es_type,

                            "_id": json.loads(message.value)['data'][0][self.id],

                            "_source": query

                       

                        actions.append(action)

                        if len(actions) > 50:

                            helpers.bulk(client=es, actions=actions)

                            print("插入es %s 条数据" % len(actions))

                            actions = []

                if len(actions) > 0:

                    helpers.bulk(client=es, actions=actions)

                    print("等待超时时间,插入es %s 条数据" % len(actions))

                    actions=[]

            except BaseException as e:

                print(e)

if __name__ == '__main__':

    arguments = docopt(__doc__,version='sbin 1.0')

    consumer = Kafka_consumer(arguments)

    consumer.consume_data_es()

以上是关于python 消费kafka 写入es 小记的主要内容,如果未能解决你的问题,请参考以下文章

使用生成器把Kafka写入速度提高1000倍

Logstash接收Kafka数据写入至ES

FlinkFlink Kafka 消费卡死 消费组卡死 topic无写入 实际有数据 topic正常

消息队列之kafka(消费语义)

Kafka如何保证百万级写入速度 并 保证不丢失不重复消费

腾讯云Logstash实战4-使用Logstash消费kafka数据并写入到Elasticsearch