为pyspark数据框中的记录间隔分配一个常数值
Posted
技术标签:
【中文标题】为pyspark数据框中的记录间隔分配一个常数值【英文标题】:Assigning a constant value to interval of records in pyspark data frame 【发布时间】:2021-05-26 17:57:28 【问题描述】:我在pyspark
中有一个如下所示的数据框
df = spark.createDataFrame(
[(1,'Y'),
(2,'Y'),
(3,'N'),
(4,'N'),
(5,'N'),
(6,'Y'),
(7,'N')
],
('id', 'status')
)
df.show()
+---+------+
| id|status|
+---+------+
| 1| Y|
| 2| Y|
| 3| N|
| 4| N|
| 5| N|
| 6| Y|
| 7| N|
+---+------+
现在我想通过为记录组分配相同的值,在数据框中创建一个新列 pack
。例如两条记录是一个组
值应该从0
开始
预期结果:
+---+------+----+
| id|status|pack|
+---+------+----+
| 1| Y| 0|
| 2| Y| 0|
| 3| N| 1|
| 4| N| 1|
| 5| N| 2|
| 6| Y| 2|
| 7| N| 3|
+---+------+----+
我应用了row_number
函数,如下所示
import pyspark.sql.functions as f
from pyspark.sql import Window
df1 = df.withColumn( "pack", f.round(f.row_number().over(Window.orderBy(f.lit(None)))/2).cast('int') )
df1.show()
+---+------+----+
| id|status|pack|
+---+------+----+
| 1| Y| 1|
| 2| Y| 1|
| 3| N| 2|
| 4| N| 2|
| 5| N| 3|
| 6| Y| 3|
| 7| N| 4|
+---+------+----+
尽管答案与我想要的相似,但值从 1
而不是 0
开始。
【问题讨论】:
您的代码 s 中的数据框与显示的一个检查索引 2 不同,但在显示中它有 Y,为什么? @Yefet 现已更正 simple -1 能解决您的问题吗?检查我的答案 【参考方案1】:# Create data frame
df = spark.createDataFrame(
[(1,'Y'),
(2,'Y'),
(3,'N'),
(4,'N'),
(5,'N'),
(6,'Y'),
(7,'N')
],
('id', 'status')
)
df.show()
+---+------+
| id|status|
+---+------+
| 1| Y|
| 2| Y|
| 3| N|
| 4| N|
| 5| N|
| 6| Y|
| 7| N|
+---+------+
# necessary imports
import pyspark.sql.functions as f
from pyspark.sql import Window
# apply ceil function and -1
df1 = df.withColumn( "pack", f.ceil(f.row_number().over(Window.orderBy(f.lit(None)))/2).cast('int') - 1)
输出
+---+------+----+
| id|status|pack|
+---+------+----+
| 1| Y| 0|
| 2| Y| 0|
| 3| N| 1|
| 4| N| 1|
| 5| N| 2|
| 6| Y| 2|
| 7| N| 3|
+---+------+----+
【讨论】:
【参考方案2】:我会改变你创建 row_number 的方式。为确保您的数据按照与输入相同的方式排序,请改为 f.row_number().over(Window.orderBy(f.monotonically_increasing_id()))
。
import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 'Y'),
(2, 'Y'),
(3, 'N'),
(4, 'N'),
(5, 'N'),
(6, 'Y'),
(7, 'N'),
(10, 'Y'),
(11, 'N')
], ('id', 'status'))
df = (df
.withColumn('row_number', f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
.withColumn('increase', ((f.col('row_number') > f.lit(2)) &
(f.col('row_number') % f.lit(2) == f.lit(1))).cast('Int'))
.withColumn('pack', f.sum('increase').over(Window.orderBy('row_number')))
.drop('row_number', 'increase'))
df.show(truncate=False)
输出
+---+------+----+
|id |status|pack|
+---+------+----+
|1 |Y |0 |
|2 |Y |0 |
|3 |N |1 |
|4 |N |1 |
|5 |N |2 |
|6 |Y |2 |
|7 |N |3 |
|10 |Y |3 |
|11 |N |4 |
+---+------+----+
【讨论】:
当我除以2
时,您的答案有效,但是当我除以 10
时,结果与预期的输出不符
@nmr 我做了修复,再检查一遍
您的答案现在有效。但是 user12345 的答案很简单以上是关于为pyspark数据框中的记录间隔分配一个常数值的主要内容,如果未能解决你的问题,请参考以下文章
如何在 PySpark 中为数据框中的所有列替换字符串值与 NULL?