groupBy 数据集每个键的数量有限制

Posted

技术标签:

【中文标题】groupBy 数据集每个键的数量有限制【英文标题】:groupBy Dataset per number of keys with a limit 【发布时间】:2021-10-18 22:49:18 【问题描述】:

基于特定键聚合数据集,但将聚合列表限制为固定数量。

附上创建数据集的代码。需要帮助来实现类似于 grouped() 与列表一起使用的机制。

  case class AggrBook(
                city: String,
                state:String,
                books:List[Int]
              )

  case class Bookings(bookingId: Int,
                  userId:String,
                  city: String,
                  state:String
                 )


val spark = SparkSession.builder.master("local")getOrCreate()

import spark.sqlContext.implicits._
val bookDS = spark.createDataset (
  Seq(
    Bookings(1, "ames", "Eureka", "CA"),
    Bookings(2, "cha", "Eureka", "CA"),
    Bookings(3, "ygy", "Eureka", "CA"),
    Bookings(4, "6fsj", "Kettlemen", "AK"),
    Bookings(5, "skj", "Eureka", "CA"),
    Bookings(6, "po", "Irvine", "CA")
  )
)
bookDS.show


val dsGrouped: Dataset[AggrBook] = bookDS.groupByKey(r => (r.city, r.state))
  .mapGroups((key, value) => AggrBook(key._2, key._1, value.map(_.bookingId).toList))
dsGrouped.show()    

 

按最多 2 个 bookingID 分组,按州、每条记录的城市聚合数据集。

我的结果

+----+---------+------------+
|city|    state|       books|
+----+---------+------------+
|  CA|   Eureka|[1, 2, 3, 5]|
|  AK|Kettlemen|         [4]|
|  CA|   Irvine|         [6]|
+----+---------+------------+
    

期待:

+----+---------+------------+
|city|    state|       books|
+----+---------+------------+
|  CA|   Eureka|[1, 2]      |
|  CA|   Eureka|[3, 5]      |
|  AK|Kettlemen|[4]         |
|  CA|   Irvine|[6]         |
+----+---------+------------+

【问题讨论】:

您能否更新您的问题以包含myDataSet 的定义?包括它的类型定义。例如DataFrameDataset[Something] 已更新。谢谢。 你能把这行加进去吗:val myDataSet: <type-here> = ... 用示例数据更新了问题 【参考方案1】:

我想不出一种简单的方法来仅使用本机 Spark 函数来实现您想要的,但可以使用如下用户定义函数 (UDF) 来完成:

val groupBy2 = udf((s: Seq[Int]) => s.grouped(2).toList)
ss.udf.register("groupBy2", groupBy2)
val dsGrouped: Dataset[AggrBook] = bookDS.groupBy("city", "state").agg(collect_list("bookingId") as "books")
  .withColumn("books", explode(groupBy2(col("books"))))
  .as[AggrBook]

UDF 采用 Seq[Int] 并返回 Seq[Seq[Int]],其中内部序列的长度为 2 或更少。然后使用本机 Spark explode 函数对其进行“扩展”,为每个“城市-州”对提供(可能)多行,但“书籍”列中只有两个 ID。

【讨论】:

以上是关于groupBy 数据集每个键的数量有限制的主要内容,如果未能解决你的问题,请参考以下文章

Spark join/groupby 数据集需要很多时间

如何使用tensorflow为每个类获取具有相同数量图像的验证集?

无法将 groupby 数据集转换为 pandas 中的 json [重复]

Spark - 使用 groupBy 减少组合数量

Apache Spark SQL数据集groupBy具有max函数和另一列中的不同值

为数据集创建了外部 Bigquery 配置单元分区表,但无法查看具有分区键的列