为pyspark数据框中的记录间隔分配一个常数值

Posted

技术标签:

【中文标题】为pyspark数据框中的记录间隔分配一个常数值【英文标题】:Assigning a constant value to interval of records in pyspark data frame 【发布时间】:2021-05-26 17:57:28 【问题描述】:

我在pyspark 中有一个如下所示的数据框

df = spark.createDataFrame(
[(1,'Y'),
(2,'Y'),
(3,'N'),
(4,'N'),
(5,'N'),
(6,'Y'),
(7,'N')
],
('id', 'status')
)


df.show()

+---+------+
| id|status|
+---+------+
|  1|     Y|
|  2|     Y|
|  3|     N|
|  4|     N|
|  5|     N|
|  6|     Y|
|  7|     N|
+---+------+

现在我想通过为记录组分配相同的值,在数据框中创建一个新列 pack。例如两条记录是一个组

值应该从0开始

预期结果:

+---+------+----+
| id|status|pack|
+---+------+----+
|  1|     Y|   0|
|  2|     Y|   0|
|  3|     N|   1|
|  4|     N|   1|
|  5|     N|   2|
|  6|     Y|   2|
|  7|     N|   3|
+---+------+----+

我应用了row_number 函数,如下所示

import pyspark.sql.functions as f
from pyspark.sql import Window

df1 = df.withColumn( "pack", f.round(f.row_number().over(Window.orderBy(f.lit(None)))/2).cast('int') )

df1.show()
+---+------+----+
| id|status|pack|
+---+------+----+
|  1|     Y|   1|
|  2|     Y|   1|
|  3|     N|   2|
|  4|     N|   2|
|  5|     N|   3|
|  6|     Y|   3|
|  7|     N|   4|
+---+------+----+

尽管答案与我想要的相似,但值从 1 而不是 0 开始。

【问题讨论】:

您的代码 s 中的数据框与显示的一个检查索引 2 不同,但在显示中它有 Y,为什么? @Yefet 现已更正 simple -1 能解决您的问题吗?检查我的答案 【参考方案1】:
# Create data frame 
df = spark.createDataFrame(
[(1,'Y'),
(2,'Y'),
(3,'N'),
(4,'N'),
(5,'N'),
(6,'Y'),
(7,'N')
],
('id', 'status')
)


df.show()

+---+------+
| id|status|
+---+------+
|  1|     Y|
|  2|     Y|
|  3|     N|
|  4|     N|
|  5|     N|
|  6|     Y|
|  7|     N|
+---+------+

# necessary imports
import pyspark.sql.functions as f
from pyspark.sql import Window

# apply ceil function and -1
df1 = df.withColumn( "pack", f.ceil(f.row_number().over(Window.orderBy(f.lit(None)))/2).cast('int') - 1)

输出

+---+------+----+
| id|status|pack|
+---+------+----+
|  1|     Y|   0|
|  2|     Y|   0|
|  3|     N|   1|
|  4|     N|   1|
|  5|     N|   2|
|  6|     Y|   2|
|  7|     N|   3|
+---+------+----+

【讨论】:

【参考方案2】:

我会改变你创建 row_number 的方式。为确保您的数据按照与输入相同的方式排序,请改为 f.row_number().over(Window.orderBy(f.monotonically_increasing_id()))

import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 'Y'),
     (2, 'Y'),
     (3, 'N'),
     (4, 'N'),
     (5, 'N'),
     (6, 'Y'),
     (7, 'N'),
     (10, 'Y'),
     (11, 'N')
     ], ('id', 'status'))

df = (df
      .withColumn('row_number', f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
      .withColumn('increase', ((f.col('row_number') > f.lit(2)) &
                               (f.col('row_number') % f.lit(2) == f.lit(1))).cast('Int'))
      .withColumn('pack', f.sum('increase').over(Window.orderBy('row_number')))
      .drop('row_number', 'increase'))
df.show(truncate=False)

输出

+---+------+----+
|id |status|pack|
+---+------+----+
|1  |Y     |0   |
|2  |Y     |0   |
|3  |N     |1   |
|4  |N     |1   |
|5  |N     |2   |
|6  |Y     |2   |
|7  |N     |3   |
|10 |Y     |3   |
|11 |N     |4   |
+---+------+----+

【讨论】:

当我除以 2 时,您的答案有效,但是当我除以 10 时,结果与预期的输出不符 @nmr 我做了修复,再检查一遍 您的答案现在有效。但是 user12345 的答案很简单

以上是关于为pyspark数据框中的记录间隔分配一个常数值的主要内容,如果未能解决你的问题,请参考以下文章

根据间隔pyspark中的记录数增加一列

Pyspark - 基于数据框中的 2 列的不同记录

过滤pySpark数据框中的日期列记录

如何在 PySpark 中为数据框中的所有列替换字符串值与 NULL?

如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?

在 PySpark 数据框中的组中的列上应用函数