根据间隔pyspark中的记录数增加一列
Posted
技术标签:
【中文标题】根据间隔pyspark中的记录数增加一列【英文标题】:Increment a column based on number of records in a interval pyspark 【发布时间】:2021-03-24 19:32:32 【问题描述】:我在pyspark
中有一个如下所示的数据框
df = sqlContext.createDataFrame(
[(1,'Y','Y',0,0,0,2,'Y','N','Y','Y'),
(2,'N','Y',2,1,2,3,'N','Y','Y','N'),
(3,'Y','N',3,1,0,0,'N','N','N','N'),
(4,'N','Y',5,0,1,0,'N','N','N','Y'),
(5,'Y','N',2,2,0,1,'Y','N','N','Y'),
(6,'Y','Y',0,0,3,6,'Y','N','Y','N'),
(7,'N','N',1,1,3,4,'N','Y','N','Y')
],
('id', 'compatible', 'product', 'ios', 'pc', 'other', 'devices', 'customer', 'subscriber', 'circle', 'smb')
)
df.show()
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+
| 1| Y| Y| 0| 0| 0| 2| Y| N| Y| Y|
| 2| N| Y| 2| 1| 2| 3| N| Y| Y| N|
| 3| Y| N| 3| 1| 0| 0| N| N| N| N|
| 4| N| Y| 5| 0| 1| 0| N| N| N| Y|
| 5| Y| N| 2| 2| 0| 1| Y| N| N| Y|
| 6| Y| Y| 0| 0| 3| 6| Y| N| Y| N|
| 7| N| N| 1| 1| 3| 4| N| Y| N| Y|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+
现在我想通过为记录组分配相同的值,在数据框中创建一个新列 p_no
。例如两条记录是一个组
我的意思是我想将值 0
分配给前两条记录,并为接下来的两条记录增加 1
的值,依此类推
预期结果:
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|p_no|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| 1| Y| Y| 0| 0| 0| 2| Y| N| Y| Y| 1|
| 2| N| Y| 2| 1| 2| 3| N| Y| Y| N| 1|
| 3| Y| N| 3| 1| 0| 0| N| N| N| N| 2|
| 4| N| Y| 5| 0| 1| 0| N| N| N| Y| 2|
| 5| Y| N| 2| 2| 0| 1| Y| N| N| Y| 3|
| 6| Y| Y| 0| 0| 3| 6| Y| N| Y| N| 3|
| 7| N| N| 1| 1| 3| 4| N| Y| N| Y| 4|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
我应用了row_number
函数,如下所示
import pyspark.sql.functions as f
from pyspark.sql import Window
df1 = df.withColumn("p_no",f.row_number().over(Window.partitionBy()))
输出如下
df1.show()
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|p_no|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| 1| Y| Y| 0| 0| 0| 2| Y| N| Y| Y| 1|
| 2| N| Y| 2| 1| 2| 3| N| Y| Y| N| 2|
| 3| Y| N| 3| 1| 0| 0| N| N| N| N| 3|
| 4| N| Y| 5| 0| 1| 0| N| N| N| Y| 4|
| 5| Y| N| 2| 2| 0| 1| Y| N| N| Y| 5|
| 6| Y| Y| 0| 0| 3| 6| Y| N| Y| N| 6|
| 7| N| N| 1| 1| 3| 4| N| Y| N| Y| 7|
| 8| Y| Y| 1| 1| 2| 0| Y| Y| N| N| 8|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
我不确定我需要做什么。 我怎样才能达到我想要的效果
【问题讨论】:
【参考方案1】:将行号除以 2 就可以了:
df2 = df.withColumn(
"p_no",
f.round(f.row_number().over(Window.orderBy(f.lit(None)))/2).cast('int')
)
df2.show()
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb|p_no|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
| 1| Y| Y| 0| 0| 0| 2| Y| N| Y| Y| 1|
| 2| N| Y| 2| 1| 2| 3| N| Y| Y| N| 1|
| 3| Y| N| 3| 1| 0| 0| N| N| N| N| 2|
| 4| N| Y| 5| 0| 1| 0| N| N| N| Y| 2|
| 5| Y| N| 2| 2| 0| 1| Y| N| N| Y| 3|
| 6| Y| Y| 0| 0| 3| 6| Y| N| Y| N| 3|
| 7| N| N| 1| 1| 3| 4| N| Y| N| Y| 4|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+----+
或者,如果 id 是连续的,您可以将 id 除以 2 进行四舍五入。
【讨论】:
这里我给出了一个示例 df 但df1 = df.withColumn("p_no", f.round(f.row_number().over(Window.orderBy('product'))/5).cast('int'))
这给了我不同的结果。我想要的是在df
中获取前两条记录而不考虑任何列,并将0
分配为新列的值。然后为接下来的两条记录将新列的值增加1
如果您不想考虑任何列,那么您可以通过f.lit(None)
订购。看到我编辑的答案了吗?以上是关于根据间隔pyspark中的记录数增加一列的主要内容,如果未能解决你的问题,请参考以下文章