我如何使用 GroupBy 而不是 Map over Dataset?

Posted

技术标签:

【中文标题】我如何使用 GroupBy 而不是 Map over Dataset?【英文标题】:How can I use GroupBy and than Map over Dataset? 【发布时间】:2019-01-23 10:21:55 【问题描述】:

我正在使用Datasets 并尝试分组,然后使用地图。

我正在设法使用 RDD,但在分组后使用数据集,我没有使用地图的选项。

有什么办法可以做到吗?

【问题讨论】:

欢迎来到 ***。在发布问题之前,我鼓励您查看是否有人已经问过同样的问题。关于这个话题,请参考this question Spark converting a Dataset to RDD的可能重复 嘿,安东尼奥,我不想使用 RDD。我想在数据集中这样做以获得优化器的好处 @AvshalomOrenstein 你不会在这里得到任何优化器的好处。请参阅 [DataFrame / Dataset groupBy behavior/optimization](***.com/q/32902982/6910411) 和 Spark 2.0 Dataset vs DataFrame 以及在接受的答案中引用的文档。 @AntonioCalì 那么如果我想使用数据集并优化性能,最好的方法是什么? 【参考方案1】:

您可以申请groupByKey:

def groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]

(Scala-specific)返回一个 KeyValueGroupedDataset,其中数据按给定的键 func 分组。

返回KeyValueGroupedDataset,然后返回mapGroups

def mapGroups[U](f: (K, Iterator[V]) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]

(Scala-specific)将给定的函数应用于每组数据。对于每个唯一组,该函数将传递组键和包含组中所有元素的迭代器。该函数可以返回任意类型的元素,该元素将作为新的数据集返回。

此函数不支持部分聚合,因此需要对 Dataset 中的所有数据进行混洗。如果应用程序打算对每个键执行聚合,最好使用 reduce 函数或 org.apache.spark.sql.expressions#Aggregator。

在内部,如果任何给定的组太大而无法放入内存,则实现将溢出到磁盘。但是,用户必须注意避免将组的整个迭代器具体化(例如,通过调用 toList),除非他们确信在考虑到集群的内存限制的情况下这是可能的。

【讨论】:

以上是关于我如何使用 GroupBy 而不是 Map over Dataset?的主要内容,如果未能解决你的问题,请参考以下文章

Pandas groupby 聚合以截断最早日期而不是最旧日期

Laravel groupBy 返回项目数组而不是使用日期作为键

当我在 nodejs 中使用 sequelize 进行 groupby 时,我得到计数​​为 1 而不是 0

为啥我得到 null 而不是 Map 对象?如何解决这个问题?

pandas groupby 可以聚合成一个列表,而不是 sum、mean 等吗?

lodash groupby 使用字符串键而不是数字