如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark

Posted

技术标签:

【中文标题】如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark【英文标题】:How to convert RDD to DataFrame in Spark Streaming, not just Spark 【发布时间】:2016-10-12 10:40:05 【问题描述】:

如何将Spark Streaming 中的RDD 转换为DataFrame,而不仅仅是Spark

我看到了这个例子,但它需要SparkContext

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

就我而言,我有StreamingContext。然后我应该在foreach 中创建SparkContext 吗?看起来太疯狂了……那么,如何处理这个问题呢?我的最终目标(如果可能有用的话)是使用rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");DataFrame 保存在Amazon S3 中,如果不将RDD 转换为DataFrame(据我所知),这是不可能的。

myDstream.foreachRDD  rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()

【问题讨论】:

查看此链接docs.cloud.databricks.com/docs/latest/databricks_guide/… @Shankar:他在哪里定义 AWS 访问密钥? foreachRDD 中写入的任何内容都会在驱动程序中执行,因此您可以创建sqlContext 并将rdd 转换为DF,然后写入S3 @Shankar:我仍然误解:我应该在foreachRDD 之外创建 StreamingContext 和 SparkContext 吗?在您发布的示例中,我找不到 sqlContext 的定义位置。我尝试重现此示例,但它给了我一个错误,即找不到 sqlContext。我不想让事情变得过于复杂,这就是为什么我会询问最简单的解决方案。 【参考方案1】:

foreachRDD之外创建sqlContext,一旦使用sqlContextrdd转换为DF,就可以写入S3了。

例如:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD  rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")

更新:

您甚至可以在 foreachRDD 中创建 sqlContext,这将在 Driver 上执行。

【讨论】:

我测试了这个例子。它说Cannot resolve symbol 指的是saveAsTextFile。我使用 scala 2.11 和 spark 1.6.2。 试试textspark.apache.org/docs/1.6.1/api/scala/… 另一个问题是我收到有关多个 SparkContext 的错误。我认为那是因为我同时拥有 SparkContext 和 StreamingContext:val ssc = new StreamingContext(conf, Seconds(refreshingIntervalSeconds.toInt)) val sc = new SparkContext(conf) sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", Utils.getAWS_ACCESS_KEY()) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", Utils.getAWS_SECRET_KEY()) val sqlContext = new SQLContext(sc) 试试val ssc = new StreamingContext(sc, Seconds(refreshingIntervalSeconds.toInt))【参考方案2】:

查看以下答案,其中包含 python 笔记本中的 scala 魔法单元: How to convert Spark Streaming data into Spark DataFrame

【讨论】:

以上是关于如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming的核心DStream之转换操作实例

使用 pyspark 在 Spark Streaming 中的 to.JSON()

在 Spark Streaming 中,如何检测空批次?

sh 如何在YARN上配置Spark Streaming作业以获得良好的弹性(http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-y

如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?

Spark Streaming:如何在流上加载管道?