如何在火花中对每个执行器执行一次操作
Posted
技术标签:
【中文标题】如何在火花中对每个执行器执行一次操作【英文标题】:How to perform one operation on each executor once in spark 【发布时间】:2017-02-22 06:27:51 【问题描述】:我有一个存储在 S3 中的 weka 模型,大小约为 400MB。 现在,我有一些记录,我想在这些记录上运行模型并执行预测。
对于执行预测,我尝试的是,
将模型作为静态对象下载并加载到驱动程序上,并将其广播给所有执行程序。对预测 RDD 执行映射操作。 ----> 不起作用,因为在 Weka 中执行预测,需要修改模型对象,并且广播需要只读副本。
将模型作为静态对象下载并加载到驱动程序上,并在每个映射操作中将其发送到执行程序。 -----> 工作(效率不高,因为在每个地图操作中,我传递了 400MB 对象)
在驱动程序上下载模型并将其加载到每个执行程序上并缓存在那里。 (不知道该怎么做)
有人知道如何在每个执行器上加载模型一次并缓存它,以便其他记录不再加载它吗?
【问题讨论】:
从 Spark 2.4 开始,有一个 Java 接口ExecutorPlugin
允许自定义 init()
和 shutdown()
> issues.apache.org/jira/browse/SPARK-24918
【参考方案1】:
你有两个选择:
1。创建一个带有表示数据的惰性 val 的单例对象:
object WekaModel
lazy val data =
// initialize data here. This will only happen once per JVM process
然后,您可以在 map
函数中使用惰性 val。 lazy val
确保每个工作 JVM 初始化他们自己的数据实例。不会为data
执行序列化或广播。
elementsRDD.map element =>
// use WekaModel.data here
优势
效率更高,因为它允许您为每个 JVM 实例初始化一次数据。例如,当需要初始化数据库连接池时,这种方法是一个不错的选择。缺点
对初始化的控制较少。例如,如果您需要运行时参数,则初始化对象会比较棘手。 如果需要,您无法真正释放或释放对象。通常这是可以接受的,因为操作系统会在进程退出时释放资源。2。在 RDD 上使用mapPartition
(或foreachPartition
)方法,而不仅仅是map
。
这允许您初始化整个分区所需的任何内容。
elementsRDD.mapPartition elements =>
val model = new WekaModel()
elements.map element =>
// use model and element. there is a single instance of model per partition.
优势:
在对象的初始化和取消初始化方面提供更大的灵活性。缺点
每个分区都会创建并初始化对象的一个新实例。根据每个 JVM 实例有多少个分区,这可能是也可能不是问题。【讨论】:
你确定#1吗?我收到了序列化错误。还有,如果数据初始化依赖运行时参数怎么办? 方法#1 不应该发生任何序列化。如果有,很可能您在 RDD 方法中引用了一个中间对象。关于你关于初始化的问题,确实比较难控制。您的运行时参数也需要静态可用(例如,通过系统属性或配置文件)。单例初始化不是 Spark 特有的。这是一个 Scala 主题。 你能在 Java 中做同样的事情吗? 关于这个缺点,Less control over initialization. For example, it's trickier to initialize your object if you require runtime parameters.
。这正是我想要达到的目标。你有任何例子或者你见过这样做吗?我正在调用外部系统以获取数据库连接配置。所以理想情况下,我不想在每个执行程序上调用外部系统。我刚刚问了这个非常相似的问题。 ***.com/questions/47241882/…
初始化数据库连接的一种可能方法是通过系统属性(例如System.getProperty("db.host")
)【参考方案2】:
这对我来说比惰性初始化器更有效。我创建了一个初始化为 null 的对象级指针,并让每个执行器对其进行初始化。在初始化块中,您可以使用一次性代码。请注意,每个处理批次都会重置局部变量,但不会重置对象级变量。
object Thing1
var bigObject : BigObject = null
def main(args: Array[String]) : Unit =
val sc = <spark/scala magic here>
sc.textFile(infile).map(line =>
if (bigObject == null)
// this takes a minute but runs just once
bigObject = new BigObject(parameters)
bigObject.transform(line)
)
这种方法每个执行器只创建一个大对象,而不是其他方法的每个分区创建一个大对象。
如果您将 var bigObject : BigObject = null 放在主函数命名空间中,它的行为会有所不同。在这种情况下,它会在每个分区(即批处理)的开头运行 bigObject 构造函数。如果您有内存泄漏,那么这最终会杀死执行程序。垃圾收集也需要做更多的工作。
【讨论】:
如果您的spark.executor.cores
大于1,这将多次调用new BigObject
。惰性方法可防止并发初始化。
@Dan 你的意思是lazy var bigObject ...
?
@Dale,您的代码在技术上不是线程安全的,因为如果多个执行器同时运行,它们可以初始化您的全局对象。【参考方案3】:
这是我们通常做的事情
定义一个做这些事情的单例客户端,以确保每个执行程序中只有一个客户端
有一个 getorcreate 方法来创建或获取客户端信息,通常让您有一个要为多个不同模型提供服务的通用服务平台,然后我们可以使用类似 concurrentmap 来确保线程安全和无计算
getorcreate 方法将在 RDD 级别内调用,如 transform 或 foreachpartition,因此请确保 init 发生在执行程序级别
【讨论】:
【参考方案4】:您可以通过广播一个带有惰性 val 的案例对象来实现这一点,如下所示:
case object localSlowTwo lazy val value: Int = Thread.sleep(1000); 2
val broadcastSlowTwo = sc.broadcast(localSlowTwo)
(1 to 1000).toDS.repartition(100).map(_ * broadcastSlowTwo.value.value).collect
三个线程的三个执行器上的事件时间线如下所示:
从同一个 spark-shell 会话再次运行最后一行不再初始化:
【讨论】:
以上是关于如何在火花中对每个执行器执行一次操作的主要内容,如果未能解决你的问题,请参考以下文章