如何使用火花流检查 rdd 是不是为空?

Posted

技术标签:

【中文标题】如何使用火花流检查 rdd 是不是为空?【英文标题】:how to check if rdd is empty using spark streaming?如何使用火花流检查 rdd 是否为空? 【发布时间】:2019-02-27 00:10:40 【问题描述】:

我有以下 pyspark 代码,用于从 logs/ 目录读取日志文件,然后仅当其中包含数据时才将结果保存到文本文件中……换句话说,当 RDD 不为空时。但我在实施它时遇到了问题。我已经尝试过 take(1) 和 notempty。由于这是 dstream rdd,我们不能对其应用 rdd 方法。如果我遗漏了什么,请告诉我。

conf = SparkConf().setMaster("local").setAppName("PysparkStreaming")
sc = SparkContext.getOrCreate(conf = conf)

ssc = StreamingContext(sc, 3)   #Streaming will execute in each 3 seconds
lines = ssc.textFileStream('/Users/rocket/Downloads/logs/')  #'logs/ mean directory name
audit = lines.map(lambda x: x.split('|')[3])
result = audit.countByValue()
#result.pprint()
#result.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
# Print the first ten elements of each RDD generated in this DStream to the console
if result.foreachRDD(lambda rdd: rdd.take(1)):
    result.pprint()
    result.saveAsTextFiles("/Users/rocket/Downloads/output","txt")
else:
    result.pprint()
    print("empty")

【问题讨论】:

【参考方案1】:

正确的结构应该是

import uuid 

def process_batch(rdd):
    if not rdd.isEmpty():
        result.saveAsTextFiles("/Users/rocket/Downloads/output-".format(
          str(uuid.uuid4())
        ) ,"txt")


result.foreachRDD(process_batch)

但是,正如您在上面看到的,每个批次都需要一个单独的目录,因为 RDD API 没有append 模式。

替代方案可能是:

def process_batch(rdd):
    if not rdd.isEmpty():
       lines = rdd.map(str)
       spark.createDataFrame(lines, "string").save.mode("append").format("text").save("/Users/rocket/Downloads/output")

【讨论】:

以上是关于如何使用火花流检查 rdd 是不是为空?的主要内容,如果未能解决你的问题,请参考以下文章

如何检查火花数据框是不是为空?

如何将火花流 DF 写入 Kafka 主题

关于火花流的变换功能的困惑

如何直接在 Azure Blob 存储上存储火花作业(结构化流)的检查点?

如何检查 RDD

如何在火花中使用`saveATextFile`保存`wholeTextFile` RDD的结果?