如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?

Posted

技术标签:

【中文标题】如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?【英文标题】:How to subscribe to a list of multiple kafka wildcard patterns using kafka-python? 【发布时间】:2017-01-24 00:25:40 【问题描述】:

我正在使用带有通配符的模式订阅 Kafka,如下所示。通配符表示动态客户 ID。

consumer.subscribe(pattern='customer.*.validations')

这很好用,因为我可以从主题字符串中提取客户 ID。但现在我需要扩展功能以听一个类似的主题,目的略有不同。我们称之为customer.*.additional-validations。代码需要存在于同一个项目中,因为有很多功能是共享的,但我需要能够根据队列的类型采用不同的路径。

在Kafka documentation 中,我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵活性的模式。

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(不工作):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

【问题讨论】:

【参考方案1】:

在KafkaConsumer代码中,它支持主题列表,或者模式,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

因此,您可以使用 | 创建一个带有 OR 条件的正则表达式,它应该可以订阅多个动态主题正则表达式,因为它在内部使用 re 模块进行匹配。

(customer.*.validations)|(customer.*.additional-validations)

【讨论】:

【参考方案2】:

在Confluent Kafka library 中,订阅没有pattern 关键字,而是处理以^ 开头的正则表达式模式。

def subscribe(self, topics, on_assign=None, *args, **kwargs):
    """
    Set subscription to a supplied list of topics
    This replaces a previous subscription.
        
    Regexp pattern subscriptions are supported by prefixing the topic string with ``"^"``, e.g.::
        
        consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
    """

【讨论】:

以上是关于如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 入门--安装配置和 kafka-python 调用

kafka-python KafkaConsumer 多分区提交偏移量

Kafka-python 检索主题列表

kafka-python消费者读取数据时自定义偏移量,自定义数据读取的顺序

kafka-python 1.4.7 版本触发的一个 rebalance 问题

Kafka SASL/PLAIN加密 及Kafka-Python整合