如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark
Posted
技术标签:
【中文标题】如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark【英文标题】:How to convert RDD to DataFrame in Spark Streaming, not just Spark 【发布时间】:2016-10-12 10:40:05 【问题描述】:如何将Spark Streaming
中的RDD
转换为DataFrame
,而不仅仅是Spark
?
我看到了这个例子,但它需要SparkContext
。
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
就我而言,我有StreamingContext
。然后我应该在foreach
中创建SparkContext
吗?看起来太疯狂了……那么,如何处理这个问题呢?我的最终目标(如果可能有用的话)是使用rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");
将DataFrame
保存在Amazon S3 中,如果不将RDD
转换为DataFrame
(据我所知),这是不可能的。
myDstream.foreachRDD rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
【问题讨论】:
查看此链接docs.cloud.databricks.com/docs/latest/databricks_guide/… @Shankar:他在哪里定义 AWS 访问密钥?foreachRDD
中写入的任何内容都会在驱动程序中执行,因此您可以创建sqlContext
并将rdd
转换为DF
,然后写入S3
。
@Shankar:我仍然误解:我应该在foreachRDD
之外创建 StreamingContext 和 SparkContext 吗?在您发布的示例中,我找不到 sqlContext
的定义位置。我尝试重现此示例,但它给了我一个错误,即找不到 sqlContext
。我不想让事情变得过于复杂,这就是为什么我会询问最简单的解决方案。
【参考方案1】:
在foreachRDD
之外创建sqlContext
,一旦使用sqlContext
将rdd
转换为DF,就可以写入S3了。
例如:
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
myDstream.foreachRDD rdd =>
val df = rdd.toDF()
df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
更新:
您甚至可以在 foreachRDD
中创建 sqlContext
,这将在 Driver 上执行。
【讨论】:
我测试了这个例子。它说Cannot resolve symbol
指的是saveAsTextFile
。我使用 scala 2.11 和 spark 1.6.2。
试试text
spark.apache.org/docs/1.6.1/api/scala/…
另一个问题是我收到有关多个 SparkContext 的错误。我认为那是因为我同时拥有 SparkContext 和 StreamingContext:val ssc = new StreamingContext(conf, Seconds(refreshingIntervalSeconds.toInt)) val sc = new SparkContext(conf) sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", Utils.getAWS_ACCESS_KEY()) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", Utils.getAWS_SECRET_KEY()) val sqlContext = new SQLContext(sc)
试试val ssc = new StreamingContext(sc, Seconds(refreshingIntervalSeconds.toInt))
【参考方案2】:
查看以下答案,其中包含 python 笔记本中的 scala 魔法单元: How to convert Spark Streaming data into Spark DataFrame
【讨论】:
以上是关于如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming的核心DStream之转换操作实例
使用 pyspark 在 Spark Streaming 中的 to.JSON()
sh 如何在YARN上配置Spark Streaming作业以获得良好的弹性(http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-y