如何使用 pyspark 管理跨集群的数据帧的物理数据放置?

Posted

技术标签:

【中文标题】如何使用 pyspark 管理跨集群的数据帧的物理数据放置?【英文标题】:How to manage physical data placement of a dataframe across the cluster with pyspark? 【发布时间】:2020-02-03 22:28:07 【问题描述】:

假设我有一个 pyspark 数据框“数据”,如下所示。我想按“期间”对数据进行分区。相反,我希望将每个时期的数据存储在它自己的分区上(参见下面的“数据”数据框下面的示例)。

data = sc.parallelize([[1,1,0,14277.4,0], \
[1,2,0,14277.4,0], \
[2,1,0,4741.91,0], \
[2,2,0,4693.03,0], \
[3,1,2,9565.93,0], \
[3,2,2,9566.05,0], \
[4,2,0,462.68,0], \
[5,1,1,3549.66,0], \
[5,2,5,3549.66,1], \
[6,1,1,401.52,0], \
[6,2,0,401.52,0], \
[7,1,0,1886.24,0], \
[7,2,0,1886.24,0]]) \
.toDF(("Acct","Period","Status","Bal","CloseFlag"))

data.show(100)

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     1|     0|14277.4|        0|
|   1|     2|     0|14277.4|        0|
|   2|     1|     0|4741.91|        0|
|   2|     2|     0|4693.03|        0|
|   3|     1|     2|9565.93|        0|
|   3|     2|     2|9566.05|        0|
|   4|     2|     0| 462.68|        0|
|   5|     1|     1|3549.66|        0|
|   5|     2|     5|3549.66|        1|
|   6|     1|     1| 401.52|        0|
|   6|     2|     0| 401.52|        0|
+----+------+------+-------+---------+

举例

分区 1:

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     1|     0|14277.4|        0|
|   2|     1|     0|4741.91|        0|
|   3|     1|     2|9565.93|        0|
|   5|     1|     1|3549.66|        0|
|   6|     1|     1| 401.52|        0|
+----+------+------+-------+---------+

分区 2:

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     2|     0|14277.4|        0|
|   2|     2|     0|4693.03|        0|
|   3|     2|     2|9566.05|        0|
|   4|     2|     0| 462.68|        0|
|   5|     2|     5|3549.66|        1|
|   6|     2|     0| 401.52|        0|
+----+------+------+-------+---------+

【问题讨论】:

【参考方案1】:

方法应该是先重新分区,获得正确的分区数(唯一周期数),然后在保存之前按 Period 列分区。

from pyspark.sql import functions as F
n = data.select(F.col('Period')).distinct().count()

data.repartition(n)\
     .write \
     .partitionBy("Period")\
     .mode("overwrite")\
     .format("parquet")\
     .saveAsTable("testing")

【讨论】:

我的数据有 11 亿多行和大约 160 列。这种方法需要很长时间。有没有更有效的方法来做到这一点? 重新分区是一个随机操作,这意味着它将重新分配所有数据,所以是的,这需要时间..您可以在重新分配之后尝试 df.persist(StorageLevel.MEMORY_AND_DISK),但在写入之前。它可能会运行得更快,具体取决于您的配置 你的答案和 df.repartition(n, col('colname')) 有什么区别? 在 df.repartition(n, col('colname')) 中,这意味着来自 'colname' 的有限数量的值(n)进入每个分区。我的理由是当我们已经在使用 partitionby 时为什么要添加额外的步骤。您可以尝试 df.repartition(numPartitions, cols).write.partitionBy(cols) 并比较性能。此链接将对您的问题有更深入的回答:***.com/questions/50775870/…

以上是关于如何使用 pyspark 管理跨集群的数据帧的物理数据放置?的主要内容,如果未能解决你的问题,请参考以下文章

如何按行将函数应用于 PySpark 数据帧的一组列?

如何根据来自其他 pyspark 数据帧的日期值过滤第二个 pyspark 数据帧?

如何使用具有不同列号pyspark的两个数据帧的并集

pyspark:如何获取 spark 数据帧的 Spark SQLContext?

如何比较来自 PySpark 数据帧的记录

从集群将整数/字符串写入 pyspark 中的文本文件