如何对pyspark数据框中的单列进行重塑操作?
Posted
技术标签:
【中文标题】如何对pyspark数据框中的单列进行重塑操作?【英文标题】:How to do reshape operation on a single column in pyspark dataframe? 【发布时间】:2021-03-25 05:50:39 【问题描述】:我有一个很长的 pyspark 数据框,如下所示:
+------+
|number|
+------+
|12.4 |
|13.4 |
|42.3 |
|33.4 |
|42.3 |
|32.4 |
|44.2 |
|12.3 |
|45.4 |
+------+
理想情况下,我希望将其重塑为 nxn
矩阵,其中 n
是 sqrt(length of pyspark dataframe)
。
虽然有一个解决方案是将其转换为 numpy 数组,然后将其重塑为 nxn
矩阵,但我希望在 pyspark 中完成。因为我的数据超长(大约一亿行)。
所以我正在寻找的预期输出是这样的:
+------+------+------+
|12.4 | 13.4 | 42.3 |
|33.4 | 42.3 | 32.4 |
|44.2 | 12.3 | 45.4 |
+------+------+------+
虽然我能够通过将其转换为 pandas 然后转换为 numpy 然后进行重塑操作来正确地做到这一点。但我想在 Pyspark 本身中进行这种转换。因为下面的代码只适用于几千行。
covarianceMatrix_pd = covarianceMatrix_df.toPandas()
nrows = np.sqrt(len(covarianceMatrix_pd))
covarianceMatrix_pd = covarianceMatrix_pd.to_numpy().reshape((int(nrows),int(nrows)))
covarianceMatrix_pd
【问题讨论】:
【参考方案1】:一种方法是在我们计算完数据帧后使用row_number
和pivot:
from pyspark.sql import functions as F, Window
from math import sqrt
c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))
out = (df.withColumn("Rnum",((rnum-1)/c).cast("Integer"))
.withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("number")))
out.show()
+----+----+----+----+
|Rnum| 1| 2| 3|
+----+----+----+----+
| 0|12.4|13.4|42.3|
| 1|33.4|42.3|32.4|
| 2|44.2|12.3|45.4|
+----+----+----+----+
【讨论】:
我不知道由于某种原因它能够成功地以所需的方式进行转换。但是顺序不正确。我的数据也是负值的双重类型。我交叉检查了一个 4 行长的 df,其值为 - [-0.002323, 0.002232, 0.003323, -0.0014]。我从你的 sn-p 得到的输出是沿着这个 -[ [0.002232,-0.0014],[-0.002323, 0.003323]]。这不符合预期的结果。你可以交叉检查@anky 吗?我认为这是因为长数据框中的负值。 我检查了你所说的新代码。顺序还是不一样的。此外,我通过运行这个 - >df.withColumn("Rnum",((rnum-1)/c).cast("Integer")).show()
进行了交叉检查。执行此操作也会更改顺序(假设我在 df 中有负值)
@VishalAnand 订单可能会改变,但是 Rnum 是否分配正确?如果是,那么这应该有效。我在社区版本中测试了您的示例并且它有效
如果我必须按这个顺序得到问题的输出怎么办。 [[12.4, 33.4, 44.2], [13.4,42.3, 12.3], [42.3, 32.4, 45.4]]。应该在代码中进行哪些更改以实现 @andy?
@VishalAnand 尝试使用像df.withColumn("Rnum",(rnum-1)%c)
这样的Rnum以上是关于如何对pyspark数据框中的单列进行重塑操作?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 PySpark 中的大型 Spark 数据框中对行的每个子集进行映射操作
如何对 Pyspark spark.sql 数据框中的数据进行同质化
如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来