我如何使用 GroupBy 而不是 Map over Dataset?
Posted
技术标签:
【中文标题】我如何使用 GroupBy 而不是 Map over Dataset?【英文标题】:How can I use GroupBy and than Map over Dataset? 【发布时间】:2019-01-23 10:21:55 【问题描述】:我正在使用Datasets
并尝试分组,然后使用地图。
我正在设法使用 RDD,但在分组后使用数据集,我没有使用地图的选项。
有什么办法可以做到吗?
【问题讨论】:
欢迎来到 ***。在发布问题之前,我鼓励您查看是否有人已经问过同样的问题。关于这个话题,请参考this question Spark converting a Dataset to RDD的可能重复 嘿,安东尼奥,我不想使用 RDD。我想在数据集中这样做以获得优化器的好处 @AvshalomOrenstein 你不会在这里得到任何优化器的好处。请参阅 [DataFrame / Dataset groupBy behavior/optimization](***.com/q/32902982/6910411) 和 Spark 2.0 Dataset vs DataFrame 以及在接受的答案中引用的文档。 @AntonioCalì 那么如果我想使用数据集并优化性能,最好的方法是什么? 【参考方案1】:您可以申请groupByKey
:
def groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]
(Scala-specific)返回一个 KeyValueGroupedDataset,其中数据按给定的键 func 分组。
返回KeyValueGroupedDataset
,然后返回mapGroups
:
def mapGroups[U](f: (K, Iterator[V]) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]
(Scala-specific)将给定的函数应用于每组数据。对于每个唯一组,该函数将传递组键和包含组中所有元素的迭代器。该函数可以返回任意类型的元素,该元素将作为新的数据集返回。
此函数不支持部分聚合,因此需要对 Dataset 中的所有数据进行混洗。如果应用程序打算对每个键执行聚合,最好使用 reduce 函数或 org.apache.spark.sql.expressions#Aggregator。
在内部,如果任何给定的组太大而无法放入内存,则实现将溢出到磁盘。但是,用户必须注意避免将组的整个迭代器具体化(例如,通过调用 toList),除非他们确信在考虑到集群的内存限制的情况下这是可能的。
【讨论】:
以上是关于我如何使用 GroupBy 而不是 Map over Dataset?的主要内容,如果未能解决你的问题,请参考以下文章
Pandas groupby 聚合以截断最早日期而不是最旧日期
Laravel groupBy 返回项目数组而不是使用日期作为键
当我在 nodejs 中使用 sequelize 进行 groupby 时,我得到计数为 1 而不是 0
为啥我得到 null 而不是 Map 对象?如何解决这个问题?