Spark 按 Key 分组并对数据进行分区
Posted
技术标签:
【中文标题】Spark 按 Key 分组并对数据进行分区【英文标题】:Spark group by Key and partitioning the data 【发布时间】:2021-02-05 21:03:27 【问题描述】:我有一个包含以下格式数据的大型 csv 文件。
cityId1,name,address,.......,zip
cityId2,name,address,.......,zip
cityId1,name,address,.......,zip
........
cityIdN,name,address,.......,zip
我正在对上述 csv 文件执行以下操作:
按cityId作为key,资源列表作为value分组
df1.groupBy($"cityId").agg(collect_list(struct(cols.head, cols.tail: _*)) as "resources")
改成jsonRDD
val jsonDataRdd2 = df2.toJSON.rdd
遍历每个分区并按密钥上传到 s3
我的问题:
spark 分区的默认大小是多少? 假设默认分区大小为 X MB,并且 dataFrame 中存在一条大记录,其键具有 Y MB 数据 (Y > X),在这种情况下会发生什么情况? 在这种情况下,我是否需要担心在不同分区中拥有相同的密钥?【问题讨论】:
很难跟上。你能显示代码吗? 这能回答你的问题吗? How does spark.csv determine the number of partitions on read? 你能接受答案吗? 【参考方案1】:回答您的问题:
从二级存储(S3、HDFS)读取时,分区等于文件系统的块大小,128MB 或 256MB;但您可以立即重新分区 RDD,而不是数据帧。 (对于 JDBC 和 Spark Structured Streaming,分区的大小是动态的。)
当应用“宽转换”并重新分区时,分区的数量和大小最有可能发生变化。给定分区的大小具有最大值。在 Spark 2.4.x 中,分区大小增加到 8GB。因此,如果任何转换(例如 collect_list 与 groupBy 的组合)生成超过此最大大小,您将收到错误并且程序中止。因此,您需要明智地进行分区,或者在您的情况下有足够数量的分区进行聚合 - 请参阅 spark.sql.shuffle.partitions 参数。
Spark 处理的并行模型依赖于通过散列、范围分区等分配的“键”分配到一个且只有一个分区 - 洗牌。所以,遍历一个分区foreachPartition,mapPartitions没有问题。
【讨论】:
以上是关于Spark 按 Key 分组并对数据进行分区的主要内容,如果未能解决你的问题,请参考以下文章