根据间隔pyspark中的记录数增加一列

Posted

技术标签:

【中文标题】根据间隔pyspark中的记录数增加一列【英文标题】:Increment a column based on number of records in a interval pyspark 【发布时间】:2021-03-24 19:32:32 【问题描述】:

我在pyspark 中有一个如下所示的数据框

df = sqlContext.createDataFrame(
[(1,'Y','Y',0,0,0,2,'Y','N','Y','Y'),
(2,'N','Y',2,1,2,3,'N','Y','Y','N'),
(3,'Y','N',3,1,0,0,'N','N','N','N'),
(4,'N','Y',5,0,1,0,'N','N','N','Y'),
(5,'Y','N',2,2,0,1,'Y','N','N','Y'),
(6,'Y','Y',0,0,3,6,'Y','N','Y','N'),
(7,'N','N',1,1,3,4,'N','Y','N','Y')
],
('id', 'compatible', 'product', 'ios', 'pc', 'other', 'devices', 'customer', 'subscriber', 'circle', 'smb')
)


df.show()

+---+----------+-------+---+---+-----+-------+--------+----------+------+---+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+
|  1|         Y|      Y|  0|  0|    0|      2|       Y|         N|     Y|  Y|
|  2|         N|      Y|  2|  1|    2|      3|       N|         Y|     Y|  N|
|  3|         Y|      N|  3|  1|    0|      0|       N|         N|     N|  N|
|  4|         N|      Y|  5|  0|    1|      0|       N|         N|     N|  Y|
|  5|         Y|      N|  2|  2|    0|      1|       Y|         N|     N|  Y|
|  6|         Y|      Y|  0|  0|    3|      6|       Y|         N|     Y|  N|
|  7|         N|      N|  1|  1|    3|      4|       N|         Y|     N|  Y|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+

现在我想通过为记录组分配相同的值,在数据框中创建一个新列 p_no。例如两条记录是一个组

我的意思是我想将值 0 分配给前两条记录,并为接下来的两条记录增加 1 的值,依此类推

预期结果:

+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|p_no|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
|  1|         Y|      Y|  0|  0|    0|      2|       Y|         N|     Y|  Y|   1|
|  2|         N|      Y|  2|  1|    2|      3|       N|         Y|     Y|  N|   1|
|  3|         Y|      N|  3|  1|    0|      0|       N|         N|     N|  N|   2|
|  4|         N|      Y|  5|  0|    1|      0|       N|         N|     N|  Y|   2|
|  5|         Y|      N|  2|  2|    0|      1|       Y|         N|     N|  Y|   3|
|  6|         Y|      Y|  0|  0|    3|      6|       Y|         N|     Y|  N|   3|
|  7|         N|      N|  1|  1|    3|      4|       N|         Y|     N|  Y|   4|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+

我应用了row_number 函数,如下所示

import pyspark.sql.functions as f
from pyspark.sql import Window

df1 = df.withColumn("p_no",f.row_number().over(Window.partitionBy()))

输出如下

df1.show()  

+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|p_no|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
|  1|         Y|      Y|  0|  0|    0|      2|       Y|         N|     Y|  Y|   1|
|  2|         N|      Y|  2|  1|    2|      3|       N|         Y|     Y|  N|   2|
|  3|         Y|      N|  3|  1|    0|      0|       N|         N|     N|  N|   3|
|  4|         N|      Y|  5|  0|    1|      0|       N|         N|     N|  Y|   4|
|  5|         Y|      N|  2|  2|    0|      1|       Y|         N|     N|  Y|   5|
|  6|         Y|      Y|  0|  0|    3|      6|       Y|         N|     Y|  N|   6|
|  7|         N|      N|  1|  1|    3|      4|       N|         Y|     N|  Y|   7|
|  8|         Y|      Y|  1|  1|    2|      0|       Y|         Y|     N|  N|   8|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+

我不确定我需要做什么。 我怎样才能达到我想要的效果

【问题讨论】:

【参考方案1】:

将行号除以 2 就可以了:

df2 = df.withColumn(
    "p_no",
    f.round(f.row_number().over(Window.orderBy(f.lit(None)))/2).cast('int')
)

df2.show()
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|p_no|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
|  1|         Y|      Y|  0|  0|    0|      2|       Y|         N|     Y|  Y|   1|
|  2|         N|      Y|  2|  1|    2|      3|       N|         Y|     Y|  N|   1|
|  3|         Y|      N|  3|  1|    0|      0|       N|         N|     N|  N|   2|
|  4|         N|      Y|  5|  0|    1|      0|       N|         N|     N|  Y|   2|
|  5|         Y|      N|  2|  2|    0|      1|       Y|         N|     N|  Y|   3|
|  6|         Y|      Y|  0|  0|    3|      6|       Y|         N|     Y|  N|   3|
|  7|         N|      N|  1|  1|    3|      4|       N|         Y|     N|  Y|   4|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+

或者,如果 id 是连续的,您可以将 id 除以 2 进行四舍五入。

【讨论】:

这里我给出了一个示例 df 但df1 = df.withColumn("p_no", f.round(f.row_number().over(Window.orderBy('product'))/5).cast('int')) 这给了我不同的结果。我想要的是在df 中获取前两条记录而不考虑任何列,并将0 分配为新列的值。然后为接下来的两条记录将新列的值增加1 如果您不想考虑任何列,那么您可以通过f.lit(None) 订购。看到我编辑的答案了吗?

以上是关于根据间隔pyspark中的记录数增加一列的主要内容,如果未能解决你的问题,请参考以下文章

根据列最大值获取记录 - 在 PySpark

PySpark 从 TimeStampType 列向 DataFrame 添加一列

PySpark - 从列表中获取字符串位置

向现有的pyspark数据框添加一列

PYSPARK:根据条件用另一个行值更新一行中的值?

pyspark 数据框如果不存在则添加一列