Pyspark根据其他列值添加新列

Posted

技术标签:

【中文标题】Pyspark根据其他列值添加新列【英文标题】:Pyspark Adding New Column According to Other Column Value 【发布时间】:2021-01-11 13:38:41 【问题描述】:

我的输入火花数据框是;

Date        Client  Values  Ratios
2020-10-26  1       NULL    0.21
2020-10-27  1       NULL    0.1
2020-10-28  1       NULL    0.25
2020-10-29  1       6       0.3
2020-10-30  1       NULL    0.4
2020-10-31  1       NULL    0.5
2020-11-01  1       1       0.3
2020-11-02  1       NULL    0.13
2020-11-03  1       NULL    0.67
2020-11-04  1       NULL    0.54
2020-11-05  1       NULL    0.2
2020-11-06  1       NULL    0.21
2020-11-07  1       2       0.7
2020-11-08  1       9       0.75
2020-11-09  1       NULL    0.9
2020-10-26  2       NULL    0.71
2020-10-27  2       NULL    0.19
2020-10-28  2       NULL    0.3
2020-10-29  2       10      0.34
2020-10-30  2       6       0.35
2020-10-31  2       NULL    0.93
2020-11-01  2       NULL    0.45
2020-11-02  2       NULL    0.43
2020-11-03  2       NULL    0.09
2020-11-04  2       NULL    0.39
2020-11-05  2       3       0.41

我想创建一个“Ratios_latest”列。为此,我应该按每个客户的“价值”列的值下降。 Anda 这将是我的“Ratio_latest”列值。我根据上面的数据分享了想要的输出;

  Date        Client  Values  Ratios   Ratios_latest
    2020-10-26  1       NULL    0.21   NULL
    2020-10-27  1       NULL    0.1    NULL
    2020-10-28  1       NULL    0.25   NULL
    2020-10-29  1       6       0.3    0.54   -> After 6 rows later "Ratios" Column value is 0.54
    2020-10-30  1       NULL    0.4    NULL
    2020-10-31  1       NULL    0.5    NULL
    2020-11-01  1       1       0.3    0.13   -> After 1 rows later "Ratios" Column value is 0.13
    2020-11-02  1       NULL    0.13   NULL
    2020-11-03  1       NULL    0.67   NULL
    2020-11-04  1       NULL    0.54   NULL
    2020-11-05  1       NULL    0.2    NULL
    2020-11-06  1       NULL    0.21   NULL
    2020-11-07  1       2       0.7    0.9   -> After 2 rows later "Ratios" Column value is 0.9
    2020-11-08  1       9       0.75   NULL  -> This value is null because it is for each client.
    2020-11-09  1       NULL    0.9    NULL
    2020-10-26  2       NULL    0.71   NULL
    2020-10-27  2       NULL    0.19   NULL
    2020-10-28  2       NULL    0.3    NULL
    2020-10-29  2       10      0.34   0.98  -> After 10 rows later "Ratios" Column value is 0.98
    2020-10-30  2       6       0.35   0.41  -> After 6 rows later "Ratios" Column value is 0.41
    2020-10-31  2       NULL    0.93   NULL
    2020-11-01  2       NULL    0.45   NULL
    2020-11-02  2       NULL    0.43   NULL
    2020-11-03  2       NULL    0.09   NULL
    2020-11-04  2       NULL    0.39   NULL
    2020-11-05  2       3       0.41   NULL

你能帮我解决这个问题吗?

【问题讨论】:

【参考方案1】:

一个带有可变偏移量的棘手情况,lead 不支持,但可以使用collect_list 解决:

import pyspark.sql.functions as F

df2 = df.withColumn(
    'Ratios_latest',
    F.expr('collect_list(Ratios) over (partition by Client order by Date rows between current row and unbounded following)')
).withColumn(
    'Ratios_latest',
    F.expr('Ratios_latest[Values]')
)

df2.show(99)
+----------+------+------+------+-------------+
|      Date|Client|Values|Ratios|Ratios_latest|
+----------+------+------+------+-------------+
|2020-10-26|     1|  null|  0.21|         null|
|2020-10-27|     1|  null|   0.1|         null|
|2020-10-28|     1|  null|  0.25|         null|
|2020-10-29|     1|     6|   0.3|         0.54|
|2020-10-30|     1|  null|   0.4|         null|
|2020-10-31|     1|  null|   0.5|         null|
|2020-11-01|     1|     1|   0.3|         0.13|
|2020-11-02|     1|  null|  0.13|         null|
|2020-11-03|     1|  null|  0.67|         null|
|2020-11-04|     1|  null|  0.54|         null|
|2020-11-05|     1|  null|   0.2|         null|
|2020-11-06|     1|  null|  0.21|         null|
|2020-11-07|     1|     2|   0.7|          0.9|
|2020-11-08|     1|     9|  0.75|         null|
|2020-11-09|     1|  null|   0.9|         null|
|2020-10-26|     2|  null|  0.71|         null|
|2020-10-27|     2|  null|  0.19|         null|
|2020-10-28|     2|  null|   0.3|         null|
|2020-10-29|     2|    10|  0.34|         null|
|2020-10-30|     2|     6|  0.35|         0.41|
|2020-10-31|     2|  null|  0.93|         null|
|2020-11-01|     2|  null|  0.45|         null|
|2020-11-02|     2|  null|  0.43|         null|
|2020-11-03|     2|  null|  0.09|         null|
|2020-11-04|     2|  null|  0.39|         null|
|2020-11-05|     2|     3|  0.41|         null|
+----------+------+------+------+-------------+

【讨论】:

以上是关于Pyspark根据其他列值添加新列的主要内容,如果未能解决你的问题,请参考以下文章

聚合列值以在 python/pyspark 中创建一个新列

如何在 pyspark 数据框中将变量值分配为新列值?

PySpark Dataframe 将两列转换为基于第三列值的元组新列

当列表值与Pyspark数据帧中的列值的子字符串匹配时,填充新列

PySpark - 添加一个递增的数字列,该列根据另一列值的变化重置为 1

PySpark Dataframe 根据函数返回值创建新列