带有广播连接的 Spark 流式传输

Posted

技术标签:

【中文标题】带有广播连接的 Spark 流式传输【英文标题】:Spark streaming with broadcast joins 【发布时间】:2016-02-19 16:03:06 【问题描述】:

我有一个 spark 流用例,我计划在每个执行器上保持广播和缓存数据集。流式传输中的每个微批次都将从 RDD 中创建一个数据帧并加入该批次。下面给出的我的测试代码将为每个批次执行广播操作。有没有办法只播放一次?

val testDF = sqlContext.read.format("com.databricks.spark.csv")
                .schema(schema).load("file:///shared/data/test-data.txt") 

val lines = ssc.socketTextStream("DevNode", 9999)

lines.foreachRDD((rdd, timestamp) => 
    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
    val resultDF = recordDF.join(broadcast(testDF), "Age")
    resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
    

对于每个批次,都会读取此文件并执行广播。

16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27

16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27

对广播数据集有任何建议吗?

【问题讨论】:

【参考方案1】:

看起来现在广播的表没有被重用。见:SPARK-3863

foreachRDD循环外执行广播:

val testDF = broadcast(sqlContext.read.format("com.databricks.spark.csv")
 .schema(schema).load(...))

lines.foreachRDD((rdd, timestamp) =>  
  val recordDF = ???
  val resultDF = recordDF.join(testDF, "Age")
  resultDF.write.format("com.databricks.spark.csv").save(...)

【讨论】:

我也尝试过这种方法,但它没有按预期播放。可能是因为 foreachRDD 是在驱动程序的上下文中执行的。顺便说一句,我们必须在 join 语句中使用 testDF.value。我认为这是一个错字。 谢谢!附带说明一下, sc.broadcast(someDataFrame) 会在广播之前将数据带到驱动程序,还是会从每个执行程序进行比特流式广播?我总是在 SQL 中使用广播提示。想知道有什么区别。 这行不通。我的意思是你可以广播DataFrame(毕竟是Serializable),但你不能在DDS上嵌套操作。您可以简单地收集、转换为比Array[Row] 更有用的东西并广播本地数据结构。然后只需使用 UDF。

以上是关于带有广播连接的 Spark 流式传输的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Phonegap 流式传输在线广播?

从在线广播流式传输音频

将实时视频广播从 android 相机流式传输到服务器

使用 AVPlayer 从 url 流式传输广播 - > iPhone 播放器控件不起作用

Spark 广播连接将数据加载到驱动程序

用于多个连接的 Spark SQL 广播 [重复]