如何使用Python以编程方式在Apache Kafka中创建主题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Python以编程方式在Apache Kafka中创建主题相关的知识,希望对你有一定的参考价值。

到目前为止,我没有看到一个python客户端显式实现主题的创建,而不使用配置选项自动创建主题。

答案

您可以使用kafka-pythonconfluent_kafka客户端以编程方式创建主题,该客户端是围绕librdkafka的轻量级包装器。

使用kafka-python

from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')

topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)

使用confluent_kafka

from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient("bootstrap_servers": "localhost:9092")

topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)
另一答案

如果您可以运行confluent_kafka(Python)v0.11.6或更高版本,那么以下是如何创建kafka主题,列出kafka主题和删除kafka主题:

>>> import confluent_kafka.admin, pprint

>>> conf        = 'bootstrap.servers': 'broker01:9092'
>>> kafka_admin = confluent_kafka.admin.AdminClient(conf)

>>> new_topic   = confluent_kafka.admin.NewTopic('topic100', 1, 1)
                  # Number-of-partitions  = 1
                  # Number-of-replicas    = 1

>>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
    'topic100': <Future at 0x7f524b0f1240 state=running> # Stdout from above command.

>>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
    'topic100' : TopicMetadata(topic100, 1 partitions),
     'topic99'  : TopicMetadata(topic99,  1 partitions),
     'topic98'  : TopicMetadata(topic98,  1 partitions)

要使用相同的kafka_admin对象删除kafka主题,请执行以下操作:

kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE

我希望这些操作有所帮助。

另一答案

看起来您可以使用以下内容来确保您的主题已经存在(我假设您使用以下kafka python实现):

client = KafkaClient(...)
producer = KafkaProducer(...)
client.ensure_topic_exists('my_new_topic')
producer.send_messages('my_new_topic', ...)
另一答案

似乎没有kafka服务器api来创建主题,因此您必须使用主题自动创建或命令行工具:

bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
另一答案

已经太晚了。我不知道有关显式创建主题的命令,但以下内容创建并添加了消息。

我创建了一个python kafka生产者:

prod = KafkaProducer(bootstrap_servers='localhost:9092')
for i in xrange(1000):
    prod.send('xyz', str(i))

在Kafka主题列表中,xyz以前没有。当我执行上述方法时,Python-kafka客户端创建了它并将消息添加到它。

另一答案

在Kafka 0.11(最初用于Java)中添加了进行编程主题创建和配置所需的AdminClient API

https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations

预计非Java客户端库也会随着时间的推移添加此功能。请与您正在使用的Kafka Python客户端的作者(有几个)一起查看是否以及何时在API中使用KIP-4管理协议支持

https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

另一答案
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'topic-name'

producer.send(topic, final_list[0]).get(timeout=10)

以上是关于如何使用Python以编程方式在Apache Kafka中创建主题的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中以编程方式启动 HiveThriftServer

如何在 Python 中以编程方式传递密码

如何以编程方式在C ++或Python中列出DLL的依赖项?

如何以编程方式在 Python 中按控制键和右箭头键?

如何以编程方式卸载我的 MSIX python 应用程序?

如何在kivymd python的MDDataTable中以编程方式取消选中复选框