如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?
Posted
技术标签:
【中文标题】如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?【英文标题】:How to create InputDStream with offsets in PySpark (using KafkaUtils.createDirectStream)? 【发布时间】:2016-01-21 00:05:22 【问题描述】:如何在 Pyspark 中使用 KafkaUtils.createDirectStream
和特定 Topic
的偏移量?
【问题讨论】:
【参考方案1】:如果您想从 Kafka 主题中的记录创建 RDD,请使用一组静态元组。
使所有导入可用
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
然后你创建一个 Kafka Brokers 的字典
kafkaParams = "metadata.broker.list": "host1:9092,host2:9092,host3:9092"
然后你创建你的偏移对象
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
最后你创建了 RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
要创建带有偏移量的 Stream,您需要执行以下操作:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
然后你使用你的 sparkcontext 创建你的 sparkstreaming 上下文
ssc = StreamingContext(sc, 1)
接下来我们设置所有参数
kafkaParams = "metadata.broker.list": "host1:9092,host2:9092,host3:9092"
start = 0
partition = 0
topic = 'topic'
然后我们创建我们的 fromOffset 字典
topicPartion = TopicAndPartition(topic,partition)
fromOffset = topicPartion: long(start)
//notice that we must cast the int to long
最后我们创建了 Stream
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
【讨论】:
但我收到错误“TypeError: unhashable type: 'TopicAndPartition'” 这对于 Kafka 0.8 和 Spark 2.0+ 已经过时了 :(【参考方案2】:你可以这样做:
from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = topicpartion: int(start)
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
"metadata.broker.list": brokers, fromOffsets = fromoffset)
注意:Spark 2.2.0、python 3.6
【讨论】:
以上是关于如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?的主要内容,如果未能解决你的问题,请参考以下文章