如何在火花中对每个执行器执行一次操作

Posted

技术标签:

【中文标题】如何在火花中对每个执行器执行一次操作【英文标题】:How to perform one operation on each executor once in spark 【发布时间】:2017-02-22 06:27:51 【问题描述】:

我有一个存储在 S3 中的 weka 模型,大小约为 400MB。 现在,我有一些记录,我想在这些记录上运行模型并执行预测。

对于执行预测,我尝试的是,

    将模型作为静态对象下载并加载到驱动程序上,并将其广播给所有执行程序。对预测 RDD 执行映射操作。 ----> 不起作用,因为在 Weka 中执行预测,需要修改模型对象,并且广播需要只读副本。

    将模型作为静态对象下载并加载到驱动程序上,并在每个映射操作中将其发送到执行程序。 -----> 工作(效率不高,因为在每个地图操作中,我传递了 400MB 对象)

    在驱动程序上下载模型并将其加载到每个执行程序上并缓存在那里。 (不知道该怎么做)

有人知道如何在每个执行器上加载模型一次并缓存它,以便其他记录不再加载它吗?

【问题讨论】:

从 Spark 2.4 开始,有一个 Java 接口 ExecutorPlugin 允许自定义 init()shutdown() > issues.apache.org/jira/browse/SPARK-24918 【参考方案1】:

你有两个选择:

1。创建一个带有表示数据的惰性 val 的单例对象:

    object WekaModel 
        lazy val data = 
            // initialize data here. This will only happen once per JVM process
        
           

然后,您可以在 map 函数中使用惰性 val。 lazy val 确保每个工作 JVM 初始化他们自己的数据实例。不会为data 执行序列化或广播。

    elementsRDD.map  element =>
        // use WekaModel.data here
    

优势

效率更高,因为它允许您为每个 JVM 实例初始化一次数据。例如,当需要初始化数据库连接池时,这种方法是一个不错的选择。

缺点

对初始化的控制较少。例如,如果您需要运行时参数,则初始化对象会比较棘手。 如果需要,您无法真正释放或释放对象。通常这是可以接受的,因为操作系统会在进程退出时释放资源。

2。在 RDD 上使用mapPartition(或foreachPartition)方法,而不仅仅是map

这允许您初始化整个分区所需的任何内容。

    elementsRDD.mapPartition  elements =>
        val model = new WekaModel()

        elements.map  element =>
            // use model and element. there is a single instance of model per partition.
        
    

优势

在对象的初始化和取消初始化方面提供更大的灵活性。

缺点

每个分区都会创建并初始化对象的一个​​新实例。根据每个 JVM 实例有多少个分区,这可能是也可能不是问题。

【讨论】:

你确定#1吗?我收到了序列化错误。还有,如果数据初始化依赖运行时参数怎么办? 方法#1 不应该发生任何序列化。如果有,很可能您在 RDD 方法中引用了一个中间对象。关于你关于初始化的问题,确实比较难控制。您的运行时参数也需要静态可用(例如,通过系统属性或配置文件)。单例初始化不是 Spark 特有的。这是一个 Scala 主题。 你能在 Java 中做同样的事情吗? 关于这个缺点,Less control over initialization. For example, it's trickier to initialize your object if you require runtime parameters.。这正是我想要达到的目标。你有任何例子或者你见过这样做吗?我正在调用外部系统以获取数据库连接配置。所以理想情况下,我不想在每个执行程序上调用外部系统。我刚刚问了这个非常相似的问题。 ***.com/questions/47241882/… 初始化数据库连接的一种可能方法是通过系统属性(例如System.getProperty("db.host")【参考方案2】:

这对我来说比惰性初始化器更有效。我创建了一个初始化为 null 的对象级指针,并让每个执行器对其进行初始化。在初始化块中,您可以使用一次性代码。请注意,每个处理批次都会重置局部变量,但不会重置对象级变量。

object Thing1 
  var bigObject : BigObject = null

  def main(args: Array[String]) : Unit = 
    val sc = <spark/scala magic here>
    sc.textFile(infile).map(line => 
      if (bigObject == null) 
         // this takes a minute but runs just once
         bigObject = new BigObject(parameters)  
      
      bigObject.transform(line)
    )
  

这种方法每个执行器只创建一个大对象,而不是其他方法的每个分区创建一个大对象。

如果您将 var bigObject : BigObject = null 放在主函数命名空间中,它的行为会有所不同。在这种情况下,它会在每个分区(即批处理)的开头运行 bigObject 构造函数。如果您有内存泄漏,那么这最终会杀死执行程序。垃圾收集也需要做更多的工作。

【讨论】:

如果您的spark.executor.cores 大于1,这将多次调用new BigObject。惰性方法可防止并发初始化。 @Dan 你的意思是lazy var bigObject ... @Dale,您的代码在技术上不是线程安全的,因为如果多个执行器同时运行,它们可以初始化您的全局对象。【参考方案3】:

这是我们通常做的事情

    定义一个做这些事情的单例客户端,以确保每个执行程序中只有一个客户端

    有一个 getorcreate 方法来创建或获取客户端信息,通常让您有一个要为多个不同模型提供服务的通用服务平台,然后我们可以使用类似 concurrentmap 来确保线程安全和无计算

    getorcreate 方法将在 RDD 级别内调用,如 transform 或 foreachpartition,因此请确保 init 发生在执行程序级别

【讨论】:

【参考方案4】:

您可以通过广播一个带有惰性 val 的案例对象来实现这一点,如下所示:

case object localSlowTwo lazy val value: Int = Thread.sleep(1000); 2
val broadcastSlowTwo = sc.broadcast(localSlowTwo)
(1 to 1000).toDS.repartition(100).map(_ * broadcastSlowTwo.value.value).collect

三个线程的三个执行器上的事件时间线如下所示:

从同一个 spark-shell 会话再次运行最后一行不再初始化:

【讨论】:

以上是关于如何在火花中对每个执行器执行一次操作的主要内容,如果未能解决你的问题,请参考以下文章

手动指定火花执行器的数量

如果满足任何(不是全部)条件,如何执行火花连接

如何使火花同时运行作业中的所有任务?

如何在查询中对日期执行操作?

火花提交:--jars 不起作用

对数据帧火花执行操作时出现空指针异常