groupBy 数据集每个键的数量有限制
Posted
技术标签:
【中文标题】groupBy 数据集每个键的数量有限制【英文标题】:groupBy Dataset per number of keys with a limit 【发布时间】:2021-10-18 22:49:18 【问题描述】:基于特定键聚合数据集,但将聚合列表限制为固定数量。
附上创建数据集的代码。需要帮助来实现类似于 grouped() 与列表一起使用的机制。
case class AggrBook(
city: String,
state:String,
books:List[Int]
)
case class Bookings(bookingId: Int,
userId:String,
city: String,
state:String
)
val spark = SparkSession.builder.master("local")getOrCreate()
import spark.sqlContext.implicits._
val bookDS = spark.createDataset (
Seq(
Bookings(1, "ames", "Eureka", "CA"),
Bookings(2, "cha", "Eureka", "CA"),
Bookings(3, "ygy", "Eureka", "CA"),
Bookings(4, "6fsj", "Kettlemen", "AK"),
Bookings(5, "skj", "Eureka", "CA"),
Bookings(6, "po", "Irvine", "CA")
)
)
bookDS.show
val dsGrouped: Dataset[AggrBook] = bookDS.groupByKey(r => (r.city, r.state))
.mapGroups((key, value) => AggrBook(key._2, key._1, value.map(_.bookingId).toList))
dsGrouped.show()
按最多 2 个 bookingID 分组,按州、每条记录的城市聚合数据集。
我的结果
+----+---------+------------+
|city| state| books|
+----+---------+------------+
| CA| Eureka|[1, 2, 3, 5]|
| AK|Kettlemen| [4]|
| CA| Irvine| [6]|
+----+---------+------------+
期待:
+----+---------+------------+
|city| state| books|
+----+---------+------------+
| CA| Eureka|[1, 2] |
| CA| Eureka|[3, 5] |
| AK|Kettlemen|[4] |
| CA| Irvine|[6] |
+----+---------+------------+
【问题讨论】:
您能否更新您的问题以包含myDataSet
的定义?包括它的类型定义。例如DataFrame
或Dataset[Something]
。
已更新。谢谢。
你能把这行加进去吗:val myDataSet: <type-here> = ...
用示例数据更新了问题
【参考方案1】:
我想不出一种简单的方法来仅使用本机 Spark 函数来实现您想要的,但可以使用如下用户定义函数 (UDF) 来完成:
val groupBy2 = udf((s: Seq[Int]) => s.grouped(2).toList)
ss.udf.register("groupBy2", groupBy2)
val dsGrouped: Dataset[AggrBook] = bookDS.groupBy("city", "state").agg(collect_list("bookingId") as "books")
.withColumn("books", explode(groupBy2(col("books"))))
.as[AggrBook]
UDF 采用 Seq[Int]
并返回 Seq[Seq[Int]]
,其中内部序列的长度为 2 或更少。然后使用本机 Spark explode
函数对其进行“扩展”,为每个“城市-州”对提供(可能)多行,但“书籍”列中只有两个 ID。
【讨论】:
以上是关于groupBy 数据集每个键的数量有限制的主要内容,如果未能解决你的问题,请参考以下文章
如何使用tensorflow为每个类获取具有相同数量图像的验证集?
无法将 groupby 数据集转换为 pandas 中的 json [重复]