在Flink中广播HashMap

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Flink中广播HashMap相关的知识,希望对你有一定的参考价值。

我正在使用Flink v.1.4.0

我正在使用DataSet API,我想尝试的其中一个与Apache Spark中广播变量的使用方式非常相似。

实际上,我想在DataSet上应用一个map函数,遍历DataSet中的每个元素并在HashMap中搜索它;如果搜索元素存在于Map中,则检索相应的值。

HashMap非常大,我不知道(因为我甚至没有建立我的解决方案)它需要Serializable同时传输和使用所有工人。

一般来说,我想到的解决方案看起来像这样:

Map<String, T> hashMap = new ... ;

DataSet<Point> points = env.readCsv(...);

points
  .map(point -> hashMap.getOrDefault(point.getId, 0))
  ...

但我不知道这是否有效,或者它是否有效。在做了一些搜索之后,我发现了一个更好的例子here,根据哪个人可以在Broadcast中使用Flink变量来广播List如下:

DataSet<Point> points = env.readCsv(...);

DataSet<Centroid> centroids = ... ; // some computation

points.map(new RichMapFunction<Point, Integer>() {

    private List<Centroid> centroids;

    @Override
    public void open(Configuration parameters) {
        this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
    }

    @Override
    public Integer map(Point p) {
        return selectCentroid(centroids, p);
    }

}).withBroadcastSet("centroids", centroids);

然而,.getBroadcastVariable()似乎只适用于List

  • 有人可以用HashMap提供替代解决方案吗?
  • 该解决方案将如何运作?
  • 解决这个问题最有效的方法是什么?
  • 可以使用Flink管理状态来执行类似于广播变量的使用方式吗?怎么样?
  • 最后,我可以尝试在管道中使用多个广播变量的多个mappings吗?
答案

hashMap的价值来自哪里?另外两个可能的解决方

  1. hashMap方法中,分别在过滤/映射运算符的每个实例中重新初始化/重新创建/重新生成open。每条记录可能效率更高,但重复初始化逻辑。
  2. 创建两个DataSet,一个用于hashMap值,第二个用于pointsjoin这两个DataSets使用desired join strategy。作为类比,您尝试做的事情可以通过SQL查询SELECT * FROM points p, hashMap h WHERE h.key = p.id来表达。

以上是关于在Flink中广播HashMap的主要内容,如果未能解决你的问题,请参考以下文章

如何在 PySpark 中广播 RDD?

让动作电缆在 Sidekiq 作业中广播

在 Rails 中广播消息时未调用 ActionCable 频道

Android中广播的发送BroadcastReceiver

如何在pyspark中广播一个巨大的rdd?

如何在自组织网络中广播?