在Apache Spark上为每个工作者创建一个单例

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Apache Spark上为每个工作者创建一个单例相关的知识,希望对你有一定的参考价值。

假设我想使用创建某种程度昂贵的对象来映射RDD。我希望每个worker / thread拥有一个这个对象,并且必须在处理每个worker上的RDD分区的项之前创建它。

我的解决方案是:

    final Function0<ModelEvaluator> f = () -> {

        if (ModelEvaluator.getInstance() == null) {
            ModelEvaluator m = new ModelEvaluator(script);
            ModelEvaluator.setInstance(m);
        }

        return ModelEvaluator.getInstance();
    };

    JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
            (t) -> {
                try {
                    double val = f.call().evaluateModel(t);
                    return new Tuple2<>(val, t);
                } catch (Exception ex) {
                    return null;
                }
            }
    );



public class ModelEvaluator {

  private static ModelEvaluator instance;

  public static void setInstance(ModelEvaluator instance) {
    ModelEvaluator.instance = instance;
  }

  public static ModelEvaluator getInstance() {
      return instance;
  } 
...

在这种情况下,“ModelEvaluator”对象解析脚本,然后使用“服务”对象列表来配置模型参数,以便计算该参数配置的关联响应度量。但是每次处理RDD行时我都不想解析脚本。

我还配置了我的集群为每个集群创建一个进程,每个进程只生成一个worker,因为在同一进程中同一进程中多个worker访问一个具有可变状态的单例实例会有问题。

对我的问题有更优雅的解决方案吗?

答案

这可以使用Broadcast变量来完成。这将允许您在驱动程序上创建一个对象,并且每个工作程序将根据需要发送一个对象。

final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script));

JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
        (t) -> {
            try {
                double val = model.value().evaluateModel(t);
                return new Tuple2<>(val, t);
            } catch (Exception ex) {
                return null;
            }
        }
);

以上是关于在Apache Spark上为每个工作者创建一个单例的主要内容,如果未能解决你的问题,请参考以下文章

ITOP上为工单创建执行者

.NET for Apache Spark 预览版正式发布

在Windows上为localhost创建自签名证书

如何为 apache spark worker 更改每个节点的内存

使用 Apache Kudu 实现多租户

Apache Spark:从IDE远程运行作业时的无限循环