Spark 按 Key 分组并对数据进行分区

Posted

技术标签:

【中文标题】Spark 按 Key 分组并对数据进行分区【英文标题】:Spark group by Key and partitioning the data 【发布时间】:2021-02-05 21:03:27 【问题描述】:

我有一个包含以下格式数据的大型 csv 文件。

cityId1,name,address,.......,zip

cityId2,name,address,.......,zip

cityId1,name,address,.......,zip

........

cityIdN,name,address,.......,zip

我正在对上述 csv 文件执行以下操作:

    按cityId作为key,资源列表作为value分组

    df1.groupBy($"cityId").agg(collect_list(struct(cols.head, cols.tail: _*)) as "resources")

    改成jsonRDD

    val jsonDataRdd2 = df2.toJSON.rdd

    遍历每个分区并按密钥上传到 s3

由于业务逻辑限制(其他服务如何从 S3 读取),我无法使用 dataframe partitionby write

我的问题:

spark 分区的默认大小是多少? 假设默认分区大小为 X MB,并且 dataFrame 中存在一条大记录,其键具有 Y MB 数据 (Y > X),在这种情况下会发生什么情况? 在这种情况下,我是否需要担心在不同分区中拥有相同的密钥?

【问题讨论】:

很难跟上。你能显示代码吗? 这能回答你的问题吗? How does spark.csv determine the number of partitions on read? 你能接受答案吗? 【参考方案1】:

回答您的问题:

从二级存储(S3、HDFS)读取时,分区等于文件系统的块大小,128MB 或 256MB;但您可以立即重新分区 RDD,而不是数据帧。 (对于 JDBC 和 Spark Structured Streaming,分区的大小是动态的。)

当应用“宽转换”并重新分区时,分区的数量和大小最有可能发生变化。给定分区的大小具有最大值。在 Spark 2.4.x 中,分区大小增加到 8GB。因此,如果任何转换(例如 collect_list 与 groupBy 的组合)生成超过此最大大小,您将收到错误并且程序中止。因此,您需要明智地进行分区,或者在您的情况下有足够数量的分区进行聚合 - 请参阅 spark.sql.shuffle.partitions 参数。

Spark 处理的并行模型依赖于通过散列、范围分区等分配的“键”分配到一个且只有一个分区 - 洗牌。所以,遍历一个分区foreachPartition,mapPartitions没有问题。

【讨论】:

以上是关于Spark 按 Key 分组并对数据进行分区的主要内容,如果未能解决你的问题,请参考以下文章

如何按列值的计数进行分组并对其进行排序?

如何按一列分组并对另一列的值进行排序?

按“内部”分区键进行有效分组

Apache Spark SQL 按范围分组数据

如何按数组中的公共元素分组?

根据 pandas 中的字典对数据帧的行进行分组并对相应的分子求和