如何保存 Spark Java Dstream RDD

Posted

技术标签:

【中文标题】如何保存 Spark Java Dstream RDD【英文标题】:How to save Spark Java Dstream RDD 【发布时间】:2015-05-11 11:43:33 【问题描述】:

Spark Scala API DStream 提供了一个方法saveAsTextFiles 将 Dstream RDD 存储在 HDFS 上。

但是Spark Java API's DStream中没有对应的方法

如何使用 Spark Java API 在 HDFS 中存储 DStream RDD?

【问题讨论】:

【参考方案1】:

Time 参数可用于为实际路径添加前缀/后缀。

myrdd.foreachRDD(new Function2<JavaPairRDD<Integer, String>, Time, Void>() 
    public Void call(JavaPairRDD<Integer, String> rdd) 
                        rdd.saveAsTextFile(path + "-" + time.toString().split(" ")[0]);
                        return null;
                    
                );

【讨论】:

【参考方案2】:

使用 Dstream 的 foreach 方法,可以先从 Dstream 中获取所有 RDD,然后使用 saveAsTextFile 方法保存这些 rdd。

这里是示例代码

sortedCounts.foreach(new Function<JavaPairRDD<Integer, String>, Void>() 
                    public Void call(JavaPairRDD<Integer, String> rdd) 
                        rdd.saveAsTextFile(path);
                        return null;
                    
                );

【讨论】:

以上代码替换了每个流式传输间隔的输出路径..(数据覆盖) 您可以在输出路径字符串中添加时间戳【参考方案3】:

尝试使用dstream() 方法将JavaDStream 转换为DStream。比如……

lines.dstream().saveAsObjectFiles("pre", "suf")

【讨论】:

【参考方案4】:

如果JavaDStream对象为dstream,目录路径为path,则可以另存为

 dstream.foreachRDD(rdd -> 
                rdd.saveAsTextFile(path);
            );

【讨论】:

【参考方案5】:

使用 JavaDStream 类的 forEachRDD API。

【讨论】:

以上是关于如何保存 Spark Java Dstream RDD的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming DStream的output操作以及foreachRDD详解

将带有joda.DateTime的案例类的DStream转换为Spark DataFrame

Spark-从Kafka读取数据

如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?

深入理解Spark Streaming

如何将 Spark Streaming DStream 制作为 SQL 表