如何在 Spark UDAF 中实现 fastutils 映射?

Posted

技术标签:

【中文标题】如何在 Spark UDAF 中实现 fastutils 映射?【英文标题】:How do I implement a fastutils map in a Spark UDAF? 【发布时间】:2019-02-05 23:47:53 【问题描述】:

我正在构建一个 Spark UDAF,我将中间数据存储在一个 fastutils 映射中。架构如下所示:

def bufferSchema = new StructType().add("my_map_col", MapType(StringType, IntegerType))

我初始化没有问题:

def initialize(buffer: MutableAggregationBuffer) = 
   buffer(0) = new Object2IntOpenHashMap[String]()

当我尝试更新时出现问题:

def update(buffer: MutableAggregationBuffer, input: Row) =  
  val myMap = buffer.getAs[Object2IntOpenHashMap[String]](0)
  myMap.put(input.getAs[String](0), 1)
  buffer(0) = myMap

得到以下错误:

Caused by: java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap

我有什么办法可以做到这一点?

【问题讨论】:

【参考方案1】:

我有什么办法可以做到这一点?

不是真的。这个

buffer.getAs[Object2IntOpenHashMap[String]](0)

等价于

buffer.get(0).asInstanceOf[Object2IntOpenHashMap[String]]]

和the external type for MapType is scala.collection.Map

实际上,无论如何它都是死路一条 - UserDefinedAggregate 函数 make full copy of data on each call。 Aggregator 可能会让您的运气更好(如链接的问题中所示)。

【讨论】:

采用聚合器方法 - 现在从 bufferEncoder 收到错误“No Encoder found for Object2IntOpenHashMap[String]”。想法? 想通了 - 正在使用 ExpressionEncoder,但它不起作用。我的地图存储在一个案例类中,所以我所要做的就是 Encoders.kryo[MyCaseClass]。

以上是关于如何在 Spark UDAF 中实现 fastutils 映射?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache Spark 中实现递归算法?

Hive 计数不同的 UDAF

如何将数组传递给 Spark (UDAF) 中的用户定义聚合函数

如何在 Spark 中实现“交叉连接”?

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

如何在 Spark SQL(PySpark) 中实现自增