如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?

Posted

技术标签:

【中文标题】如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?【英文标题】:How to create InputDStream with offsets in PySpark (using KafkaUtils.createDirectStream)? 【发布时间】:2016-01-21 00:05:22 【问题描述】:

如何在 Pyspark 中使用 KafkaUtils.createDirectStream 和特定 Topic 的偏移量?

【问题讨论】:

【参考方案1】:

如果您想从 Kafka 主题中的记录创建 RDD,请使用一组静态元组。

使所有导入可用

from pyspark.streaming.kafka import KafkaUtils, OffsetRange

然后你创建一个 Kafka Brokers 的字典

kafkaParams = "metadata.broker.list": "host1:9092,host2:9092,host3:9092"

然后你创建你的偏移对象

start = 0
until = 10
partition = 0
topic = 'topic'    
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]

最后你创建了 RDD:

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)

要创建带有偏移量的 Stream,您需要执行以下操作:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext

然后你使用你的 sparkcontext 创建你的 sparkstreaming 上下文

ssc = StreamingContext(sc, 1)

接下来我们设置所有参数

 kafkaParams = "metadata.broker.list": "host1:9092,host2:9092,host3:9092"
 start = 0
 partition = 0
 topic = 'topic'    

然后我们创建我们的 fromOffset 字典

topicPartion = TopicAndPartition(topic,partition)
fromOffset = topicPartion: long(start)
//notice that we must cast the int to long 

最后我们创建了 Stream

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
fromOffsets=fromOffset)

【讨论】:

但我收到错误“TypeError: unhashable type: 'TopicAndPartition'” 这对于 Kafka 0.8 和 Spark 2.0+ 已经过时了 :(【参考方案2】:

你可以这样做:

from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = topicpartion: int(start)
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
        "metadata.broker.list": brokers, fromOffsets = fromoffset)

注意:Spark 2.2.0、python 3.6

【讨论】:

以上是关于如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 QOpenGLTexture 中创建带 alpha 的纹理?

如何在 django 中创建带参数的链接

如何在 Android 中创建带圆角的 ListView?

如何在 Oracle 12c 中创建带条件的索引?

如何在React MUI中创建带标签的TextField

如何在 Swift 4 的 UIView 中创建带圆角的渐变边框