如何对pyspark数据框中的单列进行重塑操作?

Posted

技术标签:

【中文标题】如何对pyspark数据框中的单列进行重塑操作?【英文标题】:How to do reshape operation on a single column in pyspark dataframe? 【发布时间】:2021-03-25 05:50:39 【问题描述】:

我有一个很长的 pyspark 数据框,如下所示:

+------+
|number|
+------+
|12.4  |
|13.4  |
|42.3  |
|33.4  |
|42.3  |
|32.4  |
|44.2  |
|12.3  |
|45.4  |
+------+

理想情况下,我希望将其重塑为 nxn 矩阵,其中 nsqrt(length of pyspark dataframe)

虽然有一个解决方案是将其转换为 numpy 数组,然后将其重塑为 nxn 矩阵,但我希望在 pyspark 中完成。因为我的数据超长(大约一亿行)。

所以我正在寻找的预期输出是这样的:

+------+------+------+
|12.4  | 13.4 | 42.3 |
|33.4  | 42.3 | 32.4 |
|44.2  | 12.3 | 45.4 |
+------+------+------+

虽然我能够通过将其转换为 pandas 然后转换为 numpy 然后进行重塑操作来正确地做到这一点。但我想在 Pyspark 本身中进行这种转换。因为下面的代码只适用于几千行。

covarianceMatrix_pd = covarianceMatrix_df.toPandas()
nrows = np.sqrt(len(covarianceMatrix_pd))
covarianceMatrix_pd = covarianceMatrix_pd.to_numpy().reshape((int(nrows),int(nrows)))
covarianceMatrix_pd

【问题讨论】:

【参考方案1】:

一种方法是在我们计算完数据帧后使用row_number 和pivot:

from pyspark.sql import functions as F, Window
from math import sqrt

c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))

out = (df.withColumn("Rnum",((rnum-1)/c).cast("Integer"))
 .withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("number")))

out.show()

+----+----+----+----+
|Rnum|   1|   2|   3|
+----+----+----+----+
|   0|12.4|13.4|42.3|
|   1|33.4|42.3|32.4|
|   2|44.2|12.3|45.4|
+----+----+----+----+

【讨论】:

我不知道由于某种原因它能够成功地以所需的方式进行转换。但是顺序不正确。我的数据也是负值的双重类型。我交叉检查了一个 4 行长的 df,其值为 - [-0.002323, 0.002232, 0.003323, -0.0014]。我从你的 sn-p 得到的输出是沿着这个 -[ [0.002232,-0.0014],[-0.002323, 0.003323]]。这不符合预期的结果。你可以交叉检查@anky 吗?我认为这是因为长数据框中的负值。 我检查了你所说的新代码。顺序还是不一样的。此外,我通过运行这个 - > df.withColumn("Rnum",((rnum-1)/c).cast("Integer")).show() 进行了交叉检查。执行此操作也会更改顺序(假设我在 df 中有负值) @VishalAnand 订单可能会改变,但是 Rnum 是否分配正确?如果是,那么这应该有效。我在社区版本中测试了您的示例并且它有效 如果我必须按这个顺序得到问题的输出怎么办。 [[12.4, 33.4, 44.2], [13.4,42.3, 12.3], [42.3, 32.4, 45.4]]。应该在代码中进行哪些更改以实现 @andy? @VishalAnand 尝试使用像df.withColumn("Rnum",(rnum-1)%c)这样的Rnum

以上是关于如何对pyspark数据框中的单列进行重塑操作?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 PySpark 中的大型 Spark 数据框中对行的每个子集进行映射操作

如何对 Pyspark spark.sql 数据框中的数据进行同质化

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来

如何访问pyspark数据框中的动态列

如何在 PySpark 中为数据框中的所有列替换字符串值与 NULL?

对数据框中的某些列进行插补