访问不同类中的 Spark 广播变量

Posted

技术标签:

【中文标题】访问不同类中的 Spark 广播变量【英文标题】:Access Spark broadcast variable in different classes 【发布时间】:2016-04-15 09:12:27 【问题描述】:

我在 Spark Streaming 应用程序中广播一个值。但我不确定如何在与广播它的类不同的类中访问该变量。

我的代码如下:

object AppMain
  def main(args: Array[String])
    //...
    val broadcastA = sc.broadcast(a)
    //..
    lines.foreachRDD(rdd => 
    val obj = AppObject1
    rdd.filter(p => obj.apply(p))
    rdd.count
  


object AppObject1: Boolean
  def apply(str: String)
    AnotherObject.process(str)
  

object AnotherObject
  // I want to use broadcast variable in this object
  val B = broadcastA.Value // compilation error here
  def process(): Boolean
   //need to use B inside this method
  

谁能建议在这种情况下如何访问广播变量?

【问题讨论】:

【参考方案1】:

忽略可能的序列化问题,这里没有什么特别针对 Spark 的。如果你想使用某个对象,它必须在当前范围内可用,你可以像往常一样实现这一点:

您可以在已定义广播的范围内定义您的助手:


    ...
    val x = sc.broadcast(1)
    object Foo 
      def foo = x.value
    
    ...

您可以将其用作构造函数参数:

case class Foo(x: org.apache.spark.broadcast.Broadcast[Int]) 
  def foo = x.value


...

Foo(sc.broadcast(1)).foo

方法参数

case class Foo() 
  def foo(x: org.apache.spark.broadcast.Broadcast[Int]) = x.value


...

Foo().foo(sc.broadcast(1))

甚至像这样混入你的助手:

trait Foo 
  val x: org.apache.spark.broadcast.Broadcast[Int]
  def foo = x.value


object Main extends Foo 
  val sc = new SparkContext("local",  "test", new SparkConf())
  val x = sc.broadcast(1)

  def main(args: Array[String]) 
    sc.parallelize(Seq(None)).map(_ => foo).first
    sc.stop
  

【讨论】:

将广播变量作为函数参数传递是否有任何性能影响(例如在 curried map 函数中:map(bcast)(row))?典型的 Spark 示例总是在使用它的函数的同一范围内实例化一个广播变量。如果您想将地图函数移出范围但仍引用广播变量怎么办? @zero323 通过其他方法传递广播或广播值直到它到达使用它的位置是一个坏主意吗?就像我在 main 内部创建了一个 run 方法,我将 broadcast.value("foo" in your example) 传递给然后最终传递给 .flatmap?在驱动程序中创建广播是否更好,或者您可以在放置在平面地图中的函数中创建它? @zero323,我们可以将广播变量传递给定义为单独类的 UDF 吗?如果是这样,怎么做?不过,我正在使用 Spark 和 Java。【参考方案2】:

只是对前面介绍的性能注意事项的简短介绍。

zero233 提出的选项确实是在 Scala 中执行此类操作的非常优雅的方式。同时,了解在分布式系统中使用某些模式的含义也很重要。

使用 mixin 方法/任何使用封闭类状态的逻辑都不是最好的主意。每当您在 lambdas 中使用封闭类的状态时,Spark 都必须序列化外部对象。这并不总是正确的,但您最好编写更安全的代码,而不是有一天意外炸毁整个集群。

意识到这一点,我个人会选择将显式参数传递给方法,因为这不会导致外部类序列化(method argument 方法)。

【讨论】:

【参考方案3】:

您可以使用类并将广播变量传递给类

您的伪代码应如下所示:

object AppMain
   def main(args: Array[String])
     //...
     val broadcastA = sc.broadcast(a)
     //..
     lines.foreach(rdd => 
       val obj = new AppObject1(broadcastA)
       rdd.filter(p => obj.apply(p))
       rdd.count
     )
   


class AppObject1(bc : Broadcast[String])
   val anotherObject = new AnotherObject(bc)
   def apply(str: String): Boolean =
      anotherObject.process(str)
   


class AnotherObject(bc : Broadcast[String])
  // I want to use broadcast variable in this object
  def process(str : String): Boolean = 
    val a = bc.value
    true
    //need to use B inside this method
  

【讨论】:

以上是关于访问不同类中的 Spark 广播变量的主要内容,如果未能解决你的问题,请参考以下文章

Spark流处理中的广播变量

火花流中的广播变量空指针异常

从不同名称空间中的不同类访问变量

spark中的广播变量broadcast

在C ++ / Unreal Engine 4中访问不同类中的静态变量

Spark Broadcast总结