如何将 Spark Streaming DStream 制作为 SQL 表

Posted

技术标签:

【中文标题】如何将 Spark Streaming DStream 制作为 SQL 表【英文标题】:How to Make Spark Streaming DStream as SQL table 【发布时间】:2015-12-13 20:53:40 【问题描述】:

这里的目的如下:

每 N 秒使用 Spark Streaming 从 Socket 读取数据

将接收到的数据注册为 SQL 表

将会有更多从HDFS等读取的数据作为参考数据,它们也会被注册为SQL表

这个想法是对组合的流和参考数据执行任意 SQL 查询

请看下面的代码 sn-p。我看到数据是从 forEachRDD 循环的“内部”写入磁盘,但是当写入 forEachRDD 循环的“外部”时,同一个注册的 SQL 表的数据是空的。

请提出您的意见/建议以解决此问题。也欢迎任何其他实现上述“目标”的机制。

case class Record(id:Int, status:String, source:String)

object SqlApp2 
  def main(args: Array[String]) 
    val sparkConf = new SparkConf().setAppName("SqlApp2").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    // Create the streaming context with a 10 second batch size
    val ssc = new StreamingContext(sc, Seconds(10))

    val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)

    var alldata:DataFrame=sqlContext.emptyDataFrame
    alldata.registerTempTable("alldata")

    lines.foreachRDD((rdd: RDD[String], time: Time) => 
      import sqlContext.implicits._

      // Convert RDD[String] to DataFrame
      val data = rdd.map(w => 
        val words = w.split(" ")
        Record(words(0).toInt, words(1), words(2))).toDF()

      // Register as table
      data.registerTempTable("alldata")
      data.save("inside/file"+System.currentTimeMillis(), "json", SaveMode.ErrorIfExists)  // this data is written properly
    )

    val dataOutside = sqlContext.sql("select * from alldata")
    dataOutside.save("outside/file"+System.currentTimeMillis(), "json", SaveMode.ErrorIfExists) // this data is empty, how to make the SQL table registered inside the forEachRDD loop visible for rest of application

    ssc.start()
    ssc.awaitTermination()
  

感谢和问候

MK

【问题讨论】:

【参考方案1】:

我的理解是,除非您使用Structured Streaming route,否则您只能在“foreachRDD”之类的块中创建表。通过您的方法,您可以使用滑动窗口在表中保留一定数量的数据。我在下面给出了相关代码。

// You could create a window of 1 minute to run your query
val windowedStream = lines.window(Seconds(60))

windowedStream.foreachRDD((rdd: RDD[String], time: Time) => 
  import sqlContext.implicits._
  val data = rdd.map(w => 
    val words = w.split(" ")
    Record(words(0).toInt, words(1), words(2))
  ).toDF()
  data.createOrReplaceTempView("alldata")

  // You can read your other data source and convert it into a DF table
  // and join with the 'alldata' table
  val dataInside = sqlContext.sql("select * from alldata")
  dataInside.show()
)

希望这会有所帮助。

请注意,结构化流式传输处于初始阶段,功能非常有限。

【讨论】:

以上是关于如何将 Spark Streaming DStream 制作为 SQL 表的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据从Kafka传递到Spark Streaming?

将数据导入 Spark Streaming

spark-streaming scala:如何将字符串数组传递给过滤器?

如何将 Spark Streaming DStream 制作为 SQL 表

如何将 Spark Streaming 检查点位置存储到 S3 中?

如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark