PySpark 处理流数据并将处理后的数据保存到文件

Posted

技术标签:

【中文标题】PySpark 处理流数据并将处理后的数据保存到文件【英文标题】:PySpark Processing Stream data and saving processed data to file 【发布时间】:2016-12-25 23:55:25 【问题描述】:

我正在尝试复制一个正在流式传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。 我正在使用 Kafka 和 Spark 流式传输(在 pyspark 上),这是我的架构:

1-Kafka 生产者以以下字符串格式向名为 test 的主题发出数据:

"LG float LT float" example : LG 8100.25191107 LT 8406.43141483

生产者代码:

from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(0,10000):
    lg_value = str(random.uniform(5000, 10000))
    lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)

producer.flush()

生产者工作正常,我在消费者(甚至火花)中获得流数据

2- Spark 流正在接收这个流,我什至可以pprint()

Spark 流处理代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], "bootstrap.servers": "localhost:9092")

lines = kvs.map(lambda x: x[1])

words      = lines.flatMap(lambda line: line.split(" "))
words.pprint()
word_pairs = words.map(lambda word: (word, 1))
counts     = word_pairs.reduceByKey(lambda a, b: a+b)
results    = counts.foreachRDD(lambda word: word.saveAsTextFile("C:\path\spark_test.txt"))
//I tried this kvs.saveAsTextFiles('C:\path\spark_test.txt')
// to copy all stream and it works fine
ssc.start()
ssc.awaitTermination()

我得到一个错误:

16/12/26 00:51:53 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Python worker did not connect back in time

还有其他例外。

我真正想要的是将每个条目"LG float LT float" 保存为文件中的 JSON 格式,但首先我想简单地将坐标保存在文件中,我似乎无法做到这一点。有什么想法吗?

如果需要,我可以提供完整的堆栈跟踪

【问题讨论】:

【参考方案1】:

我这样解决了这个问题,所以我做了一个函数来保存每个 RDD,在文件中,这是解决我问题的代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], "bootstrap.servers": "localhost:9092")

lines = kvs.map(lambda x: x[1])

coords      = lines.map(lambda line: line)

def saveCoord(rdd):
    rdd.foreach(lambda rec: open("C:\path\spark_test.txt", "a").write(
        ""+rec.split(" ")[0]+":"+rec.split(" ")[1]+","+rec.split(" ")[2]+":"+rec.split(" ")[3]+",\n"))
coords.foreachRDD(saveCoord)
coords.pprint()

ssc.start()
ssc.awaitTermination()

【讨论】:

另一个改进是添加kvs.foreachPartition() 用于新建立新连接,如下所述:sungsoo.github.io/2015/04/08/…

以上是关于PySpark 处理流数据并将处理后的数据保存到文件的主要内容,如果未能解决你的问题,请参考以下文章

Python数据处理 | 批量提取文件夹下的csv文件,每个csv文件根据列索引提取特定几列,并将提取后的数据保存到新建的一个文件夹

Python数据处理 | 批量提取文件夹下的csv文件,每个csv文件根据列索引提取特定几列,并将提取后的数据保存到新建的一个文件夹

我可以使用 spark 2.3.0 和 pyspark 从 Kafka 进行流处理吗?

pyspark 结构数据处理

我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?

使用 xargs 和 awk 处理多个文件并将处理后的数据写入单独的文件