在Apache Spark上为每个工作者创建一个单例
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Apache Spark上为每个工作者创建一个单例相关的知识,希望对你有一定的参考价值。
假设我想使用创建某种程度昂贵的对象来映射RDD。我希望每个worker / thread拥有一个这个对象,并且必须在处理每个worker上的RDD分区的项之前创建它。
我的解决方案是:
final Function0<ModelEvaluator> f = () -> {
if (ModelEvaluator.getInstance() == null) {
ModelEvaluator m = new ModelEvaluator(script);
ModelEvaluator.setInstance(m);
}
return ModelEvaluator.getInstance();
};
JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
(t) -> {
try {
double val = f.call().evaluateModel(t);
return new Tuple2<>(val, t);
} catch (Exception ex) {
return null;
}
}
);
public class ModelEvaluator {
private static ModelEvaluator instance;
public static void setInstance(ModelEvaluator instance) {
ModelEvaluator.instance = instance;
}
public static ModelEvaluator getInstance() {
return instance;
}
...
在这种情况下,“ModelEvaluator”对象解析脚本,然后使用“服务”对象列表来配置模型参数,以便计算该参数配置的关联响应度量。但是每次处理RDD行时我都不想解析脚本。
我还配置了我的集群为每个集群创建一个进程,每个进程只生成一个worker,因为在同一进程中同一进程中多个worker访问一个具有可变状态的单例实例会有问题。
对我的问题有更优雅的解决方案吗?
答案
这可以使用Broadcast
变量来完成。这将允许您在驱动程序上创建一个对象,并且每个工作程序将根据需要发送一个对象。
final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script));
JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
(t) -> {
try {
double val = model.value().evaluateModel(t);
return new Tuple2<>(val, t);
} catch (Exception ex) {
return null;
}
}
);
以上是关于在Apache Spark上为每个工作者创建一个单例的主要内容,如果未能解决你的问题,请参考以下文章