PySpark:如何根据其他行值的值更改行+列的值

Posted

技术标签:

【中文标题】PySpark:如何根据其他行值的值更改行+列的值【英文标题】:PySpark: How do I change the value of a row+column based on the value of other row values 【发布时间】:2019-08-19 19:03:18 【问题描述】:

我有一个这样的数据框...

+----------+-----+
|      date|price|
+----------+-----+
|2019-01-01|   25|
|2019-01-02|   22|
|2019-01-03|   20|
|2019-01-04|   -5|
|2019-01-05|   -1|
|2019-01-06|   -2|
|2019-01-07|    5|
|2019-01-08|  -11|
+----------+-----+

我想根据需要回顾其他行的逻辑创建一个新列 - 而不仅仅是同一行的列值

我尝试了一些 UDF,但它采用了相应列的行值。不知道怎么看其他行...

例如: 我想创建一个新列“newprice” - 这将是这样的......

+----------+-----+----------+
|      date|price|new price
+----------+-----+----------+
|2019-01-01|   25| 25
|2019-01-02|   22| 22
|2019-01-03|   20| 20
|2019-01-04|   -5| 20
|2019-01-05|   -1| 20
|2019-01-06|   -2| 20
|2019-01-07|    5| 5
|2019-01-08|  -11| 5
+----------+-----+-----------+

基本上,新列值中的每一行不是基于相应行的值,而是基于其他行的值...

逻辑:如果价格为负值,则回顾前几天,如果那一天为正值 - 接受它或再返回一天,直到出现正值...

    dateprice = [('2019-01-01',25),('2019-01-02',22),('2019-01-03',20),('2019-01-04', -5),\
     ('2019-01-05',-1),('2019-01-06',-2),('2019-01-07',5),('2019-01-08', -11)]

   dataDF = sqlContext.createDataFrame(dateprice, ('date', 'price'))



我们将不胜感激。

【问题讨论】:

【参考方案1】:

首先用price 列填充new price 列,但用nulls 替换负值。然后,您可以使用Fill in null with previously known good value with pyspark 中显示的技术来获取最后一个非空值,在这种情况下,它将是最后一个正值。

例如:

from pyspark.sql.functions import col, last, when
from pyspark.sql import Window

w = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

dataDF.withColumn("new_price", when(col("price") >= 0, col("price")))\
    .withColumn(
        "new_price",
        last('new_price', True).over(w)
    )\
    .show()
#+----------+-----+---------+
#|      date|price|new_price|
#+----------+-----+---------+
#|2019-01-01|   25|       25|
#|2019-01-02|   22|       22|
#|2019-01-03|   20|       20|
#|2019-01-04|   -5|       20|
#|2019-01-05|   -1|       20|
#|2019-01-06|   -2|       20|
#|2019-01-07|    5|        5|
#|2019-01-08|  -11|        5|
#+----------+-----+---------+

这里我利用了when 在条件不匹配且未指定otherwise 时默认返回null 的事实。

【讨论】:

顺便说一句:我正在阅读有关窗口的东西......但看着这个我也想看看我是否可以做一些事情,比如用 2 天前的值替换行的值 - 无论是负数还是正数或空值...只是复制 2 天前或更确切地说是 n 天前的值。 嗨@pault,如果现在我想在名称 A 或 B 内进行操作?我怎样才能做到这一点? +---------+---+------+---------+ |日期|名称|价格|新价格| +---------+---+-----+---------+ |2019-01-01|一个| 25| 25| |2019-01-01|乙| 2| 2| |2019-01-02|一个| 22| 22| |2019-01-02|乙| -1| 22| |2019-01-03|一个| 20| 20| |2019-01-03|乙| 10| 10| |2019-01-04|一个| -5| 10| |2019-01-04|乙| 7| 7| |2019-01-05|一个| -1| 7| |2019-01-05|乙| 0| 0| +---------+---+------+---------+ 嗨@pault,新表在主要问题结束 w = Window.partitionBy("name").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow) @SanDNath 如果您有新问题,请编辑您的问题或发布新问题。不要在 cmets 中发布代码。【参考方案2】:

我使用 Spark SQL 尝试了这个。让我分两部分解释我的解决方案,

首先,当价格为负数时,我们可以获取价格为正数的最近日期,否则我们可以自行填充价格,如下所示,

spark.sql("""
select *,
case when price < 0 then
max(lag(case when price < 0 then null else date end) over(order by date))
over(order by date rows between unbounded preceding and current row)
else price end as price_or_date 
from dataset
""").show()

输出:

+----------+-----+-------------+
|      date|price|price_or_date|
+----------+-----+-------------+
|2019-01-01|   25|           25|
|2019-01-02|   22|           22|
|2019-01-03|   20|           20|
|2019-01-04|   -5|   2019-01-03|
|2019-01-05|   -1|   2019-01-03|
|2019-01-06|   -2|   2019-01-03|
|2019-01-07|    5|            5|
|2019-01-08|  -11|   2019-01-07|
+----------+-----+-------------+

其次,您可以使用date 和此派生列对同一数据集执行left join。所以,现在price_or_date 列中的价格将显示为null。最后我们可以对它们执行一个简单的coalesce

结合它们,我们可以实现如下所示的最终查询以生成所需的输出,

spark.sql("""
select 
   a.date
 , a.price
 , coalesce(b.price, a.price) as new_price
from
(
select *,
case when price < 0 then
max(lag(case when price < 0 then null else date end) over(order by date))
over(order by date rows between unbounded preceding and current row)
else price end as price_or_date 
from dataset
) a
left join dataset b
on a.price_or_date = b.date 
order by a.date""").show()

输出:

+----------+-----+---------+
|      date|price|new_price|
+----------+-----+---------+
|2019-01-01|   25|       25|
|2019-01-02|   22|       22|
|2019-01-03|   20|       20|
|2019-01-04|   -5|       20|
|2019-01-05|   -1|       20|
|2019-01-06|   -2|       20|
|2019-01-07|    5|        5|
|2019-01-08|  -11|        5|
+----------+-----+---------+

希望这会有所帮助。

【讨论】:

以上是关于PySpark:如何根据其他行值的值更改行+列的值的主要内容,如果未能解决你的问题,请参考以下文章

如何通过在python中添加同一列的2个不同行值的值来估算特定的行值

PYSPARK:根据条件用另一个行值更新一行中的值?

如何根据列值更改行按钮类

Pyspark数据帧:根据另一列的值提取列

根据循环内另一列的值将列的值更改为nan

Pyspark:如何根据另一列的值填充空值