如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?

Posted

技术标签:

【中文标题】如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?【英文标题】:how to write a spark-transformed data back to a kafka broker using pyspark? 【发布时间】:2016-05-19 22:04:07 【问题描述】:

在我的 pyspark 应用程序中,我打算使用 Spark 流作为一种在“飞行中”转换 Kafka 消息的方法。每条这样的消息最初都是从特定的 Kafka 主题接收的。此类消息将需要进行一些转换(假设 - 用一个字符串替换另一个字符串),并且转换后的版本需要发布在不同的 Kafka 主题上。 第一部分(接收 Kafka 消息)似乎工作正常:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
    ...

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

将某些东西(比如说 - 一个字符串)放到不同的 Kafka 主题上的正确语法是什么? 这种方法应该由 KafkaUtils 提供,还是以其他方式提供?

【问题讨论】:

【参考方案1】:

在处理函数中,我们可以对每条记录执行任何操作,然后将该记录发送到不同的 kafka 主题:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

要运行这个:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test

【讨论】:

请注意,不应该在驱动程序上建立连接,然后在执行程序之间共享(这是一个糟糕的想法,因为 a)它可能无法正常工作,并且 b)如果这样做会增加严重的开销),但在每个分区上创建。几个相关链接michael-noll.com/blog/2014/10/01/… Spark Streaming 指南上的“使用 foreachRDD 的设计模式”部分spark.apache.org/docs/1.1.0/…【参考方案2】:

根据SPARK文档的正确做法 https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

def kafka_sender(messages):
    producer = KafkaProducer(bootstrap_servers='localhost:9092')

    for message in messages:
        producer.send('alerts', bytes(message[0].encode('utf-8')))
        # For faster push
        # producer.flush()  

    producer.flush()



# On your Dstream
sentiment_data.foreachRDD(lambda rdd: rdd.foreachPartition(kafka_sender))

【讨论】:

以上是关于如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?的主要内容,如果未能解决你的问题,请参考以下文章

数据类型转换火花数据框列 - pyspark

将多个火花数据框列转换为具有列表类型的单列

Pyspark 结构化流处理

火花流到pyspark json文件中的数据帧

存储火花数据框-pyspark

Pyspark:从列表的 RDD 创建一个火花数据框,其中列表的某些元素是对象