如何使用 pyspark 管理跨集群的数据帧的物理数据放置?
Posted
技术标签:
【中文标题】如何使用 pyspark 管理跨集群的数据帧的物理数据放置?【英文标题】:How to manage physical data placement of a dataframe across the cluster with pyspark? 【发布时间】:2020-02-03 22:28:07 【问题描述】:假设我有一个 pyspark 数据框“数据”,如下所示。我想按“期间”对数据进行分区。相反,我希望将每个时期的数据存储在它自己的分区上(参见下面的“数据”数据框下面的示例)。
data = sc.parallelize([[1,1,0,14277.4,0], \
[1,2,0,14277.4,0], \
[2,1,0,4741.91,0], \
[2,2,0,4693.03,0], \
[3,1,2,9565.93,0], \
[3,2,2,9566.05,0], \
[4,2,0,462.68,0], \
[5,1,1,3549.66,0], \
[5,2,5,3549.66,1], \
[6,1,1,401.52,0], \
[6,2,0,401.52,0], \
[7,1,0,1886.24,0], \
[7,2,0,1886.24,0]]) \
.toDF(("Acct","Period","Status","Bal","CloseFlag"))
data.show(100)
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 1| 0|14277.4| 0|
| 1| 2| 0|14277.4| 0|
| 2| 1| 0|4741.91| 0|
| 2| 2| 0|4693.03| 0|
| 3| 1| 2|9565.93| 0|
| 3| 2| 2|9566.05| 0|
| 4| 2| 0| 462.68| 0|
| 5| 1| 1|3549.66| 0|
| 5| 2| 5|3549.66| 1|
| 6| 1| 1| 401.52| 0|
| 6| 2| 0| 401.52| 0|
+----+------+------+-------+---------+
举例
分区 1:
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 1| 0|14277.4| 0|
| 2| 1| 0|4741.91| 0|
| 3| 1| 2|9565.93| 0|
| 5| 1| 1|3549.66| 0|
| 6| 1| 1| 401.52| 0|
+----+------+------+-------+---------+
分区 2:
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 2| 0|14277.4| 0|
| 2| 2| 0|4693.03| 0|
| 3| 2| 2|9566.05| 0|
| 4| 2| 0| 462.68| 0|
| 5| 2| 5|3549.66| 1|
| 6| 2| 0| 401.52| 0|
+----+------+------+-------+---------+
【问题讨论】:
【参考方案1】:方法应该是先重新分区,获得正确的分区数(唯一周期数),然后在保存之前按 Period 列分区。
from pyspark.sql import functions as F
n = data.select(F.col('Period')).distinct().count()
data.repartition(n)\
.write \
.partitionBy("Period")\
.mode("overwrite")\
.format("parquet")\
.saveAsTable("testing")
【讨论】:
我的数据有 11 亿多行和大约 160 列。这种方法需要很长时间。有没有更有效的方法来做到这一点? 重新分区是一个随机操作,这意味着它将重新分配所有数据,所以是的,这需要时间..您可以在重新分配之后尝试 df.persist(StorageLevel.MEMORY_AND_DISK),但在写入之前。它可能会运行得更快,具体取决于您的配置 你的答案和 df.repartition(n, col('colname')) 有什么区别? 在 df.repartition(n, col('colname')) 中,这意味着来自 'colname' 的有限数量的值(n)进入每个分区。我的理由是当我们已经在使用 partitionby 时为什么要添加额外的步骤。您可以尝试 df.repartition(numPartitions, cols).write.partitionBy(cols) 并比较性能。此链接将对您的问题有更深入的回答:***.com/questions/50775870/…以上是关于如何使用 pyspark 管理跨集群的数据帧的物理数据放置?的主要内容,如果未能解决你的问题,请参考以下文章
如何根据来自其他 pyspark 数据帧的日期值过滤第二个 pyspark 数据帧?