在Flink中广播HashMap
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Flink中广播HashMap相关的知识,希望对你有一定的参考价值。
我正在使用Flink v.1.4.0
。
我正在使用DataSet
API,我想尝试的其中一个与Apache Spark
中广播变量的使用方式非常相似。
实际上,我想在DataSet
上应用一个map函数,遍历DataSet
中的每个元素并在HashMap
中搜索它;如果搜索元素存在于Map中,则检索相应的值。
HashMap
非常大,我不知道(因为我甚至没有建立我的解决方案)它需要Serializable
同时传输和使用所有工人。
一般来说,我想到的解决方案看起来像这样:
Map<String, T> hashMap = new ... ;
DataSet<Point> points = env.readCsv(...);
points
.map(point -> hashMap.getOrDefault(point.getId, 0))
...
但我不知道这是否有效,或者它是否有效。在做了一些搜索之后,我发现了一个更好的例子here,根据哪个人可以在Broadcast
中使用Flink
变量来广播List
如下:
DataSet<Point> points = env.readCsv(...);
DataSet<Centroid> centroids = ... ; // some computation
points.map(new RichMapFunction<Point, Integer>() {
private List<Centroid> centroids;
@Override
public void open(Configuration parameters) {
this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
}
@Override
public Integer map(Point p) {
return selectCentroid(centroids, p);
}
}).withBroadcastSet("centroids", centroids);
然而,.getBroadcastVariable()
似乎只适用于List
。
- 有人可以用
HashMap
提供替代解决方案吗? - 该解决方案将如何运作?
- 解决这个问题最有效的方法是什么?
- 可以使用Flink管理状态来执行类似于广播变量的使用方式吗?怎么样?
- 最后,我可以尝试在管道中使用多个广播变量的多个
mappings
吗?
答案
hashMap
的价值来自哪里?另外两个可能的解决方
- 在
hashMap
方法中,分别在过滤/映射运算符的每个实例中重新初始化/重新创建/重新生成open。每条记录可能效率更高,但重复初始化逻辑。 - 创建两个
DataSet
,一个用于hashMap
值,第二个用于points
和join这两个DataSet
s使用desired join strategy。作为类比,您尝试做的事情可以通过SQL查询SELECT * FROM points p, hashMap h WHERE h.key = p.id
来表达。
以上是关于在Flink中广播HashMap的主要内容,如果未能解决你的问题,请参考以下文章
在 Rails 中广播消息时未调用 ActionCable 频道