在 KafkaUtils.createstream() 中使用“topics”参数的正确方法是啥?

Posted

技术标签:

【中文标题】在 KafkaUtils.createstream() 中使用“topics”参数的正确方法是啥?【英文标题】:What is the correct way to use the "topics" parameter in KafkaUtils.createstream()?在 KafkaUtils.createstream() 中使用“topics”参数的正确方法是什么? 【发布时间】:2018-06-18 01:53:18 【问题描述】:

我在 python 中一起使用 spark-streaming 和 kafka,并松散地跟随 this post,但我对前面提到的 KafkaUtils.createStream() 函数有点困惑。

documentation 并没有通过明确解释主题字典的影响来做很多事情。但我怀疑我只是这么认为,因为我对 kafka 工作原理的了解是不稳定的,答案是显而易见的。

我知道它应该是这样的字典:"topic.name": 1,我可以模仿文档并说这意味着创建的流将从单个分区消耗。

所以我想我只是想澄清一下这个特定函数的用法,以及我对 kafka 概念的理解。我们将使用以下示例:

假设我定义了一个主题 my.topic,它有 3 个分区,其传入消息按一个键拆分,我们只说一个用户 ID。

如果我像这样初始化流:

from pyspark.streaming.kafka import KafkaUtils

kafkaStream = KafkaUtils.createStream(
    ssc, 
    'kafka:2181', 
    'consumer-group-name', 
    'my.topic':1
)  

我认为这个流只会从单个分区消耗,因此不会看到每条消息都进入my.topic,这是否正确?换句话说,它只会看到来自用户 ID 的消息发送到 3 个分区之一?

那么我的问题:

    如何正确设置此参数以消费发送到my.topic的所有消息?

    我的直觉是我只需将主题参数设置为'my.topic': 3,那么我的问题就变成了:

    为什么我会使用小于分区总数的数字?

    我的直觉告诉我,这取决于你正在做的工作有多“原子”。例如,如果我只是简单地将数据转换(例如,从 CSV 转换为 JSON 文档列表或其他内容)然后将上述 3 个流中的每个流都设置为 'my.topic': 1 作为它们的主题参数,并且同一消费者组的所有部分将是通过启用每个分区的并行消费来获得好处,因为不需要共享有关所消费的每条消息的信息。

    同时,如果我正在计算旨在涵盖整个主题的实时指标 I.E.带有过滤器等的时间窗平均值。我很难找到一种方法来实现类似的东西而不设置'my.topic': 3,或者如果它像一个总和,那么对消费者组内的每个组件信号进行稍微复杂的下游处理即Sum1 + Sum2 + Sum3 = TotalSum

    但我的知识再次处于与 Kafka 和 Spark 玩耍的“初出茅庐”阶段。

    有没有办法告诉 createStream() 从所有分区消费,而无需提前知道有多少? 'my.topic': -1 之类的东西?

    可以在一个流中指定多个主题吗? IE。 'my.topic': 1, 'my.other.topic': 1

我真的很讨厌这个问题的答案只是“是的,你的直觉是正确的。”。最好的情况是有人告诉我我误解了一切并让我直截了当。所以请...这样做!

【问题讨论】:

【参考方案1】:

这是 Kafka-Spark 集成页面中提到的内容。

val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic 要消费的 Kafka 分区数])

KafkaUtils.createStream 将创建一个接收器并使用 Kafka 主题。

“每个主题要消耗的 Kafka 分区数”选项表示此接收器将并行读取多少个分区。

假设您有一个名为“Topic1”的主题,有 2 个分区,并且您提供了选项 'Topic1':1,那么 Kafka 接收器将一次读取 1 个分区 [它最终会读取所有分区,但会读取一次一个分区]。这样做的原因是为了读取分区中的消息并保留数据写入主题的顺序。

例如,Topic1 的 partition1 有消息 1,11,21,31,41,partition2 有消息 2,12,22,32,42,然后使用上述设置读取将产生类似 1,11,21,31,41,2,12,22,32,42。每个分区中的消息是单独读取的,因此不会混合在一起。

如果您提供选项为 'Topic1':2,则接收方将一次读取 2 个分区,并且这些分区内的消息将混合在一起。对于上面相同的示例,具有 'Topic1':2 的接收器将产生类似 1,2,11,12,21,22....

将此视为接收器可以在给定主题分区上执行的并行读取次数。

5.一个流中可以指定多个主题吗? 是的你可以。

【讨论】:

啊,我完全被误导了。谢谢,陌生人。【参考方案2】:

只需指定没有分区的主题,无论主题有多少个分区,您都会得到该主题中的所有消息。

您只需查看示例代码:https://github.com/apache/spark/blob/v2.2.1/examples/src/main/python/streaming/direct_kafka_wordcount.py#L48

【讨论】:

该代码示例使用 createDirectStream,这显然略有不同,并被标记为实验性 回来表示感谢,因为您的评论让我回去重新阅读集成文档,我想我现在更好地理解了 createDirectStream,它最终成为了我想要的。所以谢谢!

以上是关于在 KafkaUtils.createstream() 中使用“topics”参数的正确方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

Scala创建SparkStreaming获取Kafka数据代码过程

如何将火花流输出转换为数据帧或存储在表中

Spark Streaming Kafka 接收器 API - numPartitions

将 RDD.cartesian 与 Spark Streaming 结合使用是不是存在错误?

分配的变量引用在哪里,在堆栈中还是在堆中?

NOIP 2015 & SDOI 2016 Round1 & CTSC 2016 & SDOI2016 Round2游记