如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?



如何在 Pyspark 中使用 KafkaUtils.createDirectStream 和特定 Topic 的偏移量?



如果您想从 Kafka 主题中的记录创建 RDD,请使用一组静态元组。


from pyspark.streaming.kafka import KafkaUtils, OffsetRange

然后你创建一个 Kafka Brokers 的字典

kafkaParams = "metadata.broker.list": "host1:9092,host2:9092,host3:9092"


start = 0
until = 10
partition = 0
topic = 'topic'    
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]

最后你创建了 RDD:

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)

要创建带有偏移量的 Stream,您需要执行以下操作:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext

然后你使用你的 sparkcontext 创建你的 sparkstreaming 上下文

ssc = StreamingContext(sc, 1)


 kafkaParams = "metadata.broker.list": "host1:9092,host2:9092,host3:9092"
 start = 0
 partition = 0
 topic = 'topic'    

然后我们创建我们的 fromOffset 字典

topicPartion = TopicAndPartition(topic,partition)
fromOffset = topicPartion: long(start)
//notice that we must cast the int to long 

最后我们创建了 Stream

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 


from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = topicpartion: int(start)
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
        "metadata.broker.list": brokers, fromOffsets = fromoffset)

注意:Spark 2.2.0、python 3.6


