PySpark:如何根据其他行值的值更改行+列的值
Posted
技术标签:
【中文标题】PySpark:如何根据其他行值的值更改行+列的值【英文标题】:PySpark: How do I change the value of a row+column based on the value of other row values 【发布时间】:2019-08-19 19:03:18 【问题描述】:我有一个这样的数据框...
+----------+-----+
| date|price|
+----------+-----+
|2019-01-01| 25|
|2019-01-02| 22|
|2019-01-03| 20|
|2019-01-04| -5|
|2019-01-05| -1|
|2019-01-06| -2|
|2019-01-07| 5|
|2019-01-08| -11|
+----------+-----+
我想根据需要回顾其他行的逻辑创建一个新列 - 而不仅仅是同一行的列值
我尝试了一些 UDF,但它采用了相应列的行值。不知道怎么看其他行...
例如: 我想创建一个新列“newprice” - 这将是这样的......
+----------+-----+----------+
| date|price|new price
+----------+-----+----------+
|2019-01-01| 25| 25
|2019-01-02| 22| 22
|2019-01-03| 20| 20
|2019-01-04| -5| 20
|2019-01-05| -1| 20
|2019-01-06| -2| 20
|2019-01-07| 5| 5
|2019-01-08| -11| 5
+----------+-----+-----------+
基本上,新列值中的每一行不是基于相应行的值,而是基于其他行的值...
逻辑:如果价格为负值,则回顾前几天,如果那一天为正值 - 接受它或再返回一天,直到出现正值...
dateprice = [('2019-01-01',25),('2019-01-02',22),('2019-01-03',20),('2019-01-04', -5),\
('2019-01-05',-1),('2019-01-06',-2),('2019-01-07',5),('2019-01-08', -11)]
dataDF = sqlContext.createDataFrame(dateprice, ('date', 'price'))
我们将不胜感激。
【问题讨论】:
【参考方案1】:首先用price
列填充new price
列,但用null
s 替换负值。然后,您可以使用Fill in null with previously known good value with pyspark 中显示的技术来获取最后一个非空值,在这种情况下,它将是最后一个正值。
例如:
from pyspark.sql.functions import col, last, when
from pyspark.sql import Window
w = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
dataDF.withColumn("new_price", when(col("price") >= 0, col("price")))\
.withColumn(
"new_price",
last('new_price', True).over(w)
)\
.show()
#+----------+-----+---------+
#| date|price|new_price|
#+----------+-----+---------+
#|2019-01-01| 25| 25|
#|2019-01-02| 22| 22|
#|2019-01-03| 20| 20|
#|2019-01-04| -5| 20|
#|2019-01-05| -1| 20|
#|2019-01-06| -2| 20|
#|2019-01-07| 5| 5|
#|2019-01-08| -11| 5|
#+----------+-----+---------+
这里我利用了when
在条件不匹配且未指定otherwise
时默认返回null
的事实。
【讨论】:
顺便说一句:我正在阅读有关窗口的东西......但看着这个我也想看看我是否可以做一些事情,比如用 2 天前的值替换行的值 - 无论是负数还是正数或空值...只是复制 2 天前或更确切地说是 n 天前的值。 嗨@pault,如果现在我想在名称 A 或 B 内进行操作?我怎样才能做到这一点? +---------+---+------+---------+ |日期|名称|价格|新价格| +---------+---+-----+---------+ |2019-01-01|一个| 25| 25| |2019-01-01|乙| 2| 2| |2019-01-02|一个| 22| 22| |2019-01-02|乙| -1| 22| |2019-01-03|一个| 20| 20| |2019-01-03|乙| 10| 10| |2019-01-04|一个| -5| 10| |2019-01-04|乙| 7| 7| |2019-01-05|一个| -1| 7| |2019-01-05|乙| 0| 0| +---------+---+------+---------+ 嗨@pault,新表在主要问题结束 w = Window.partitionBy("name").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow) @SanDNath 如果您有新问题,请编辑您的问题或发布新问题。不要在 cmets 中发布代码。【参考方案2】:我使用 Spark SQL 尝试了这个。让我分两部分解释我的解决方案,
首先,当价格为负数时,我们可以获取价格为正数的最近日期,否则我们可以自行填充价格,如下所示,
spark.sql("""
select *,
case when price < 0 then
max(lag(case when price < 0 then null else date end) over(order by date))
over(order by date rows between unbounded preceding and current row)
else price end as price_or_date
from dataset
""").show()
输出:
+----------+-----+-------------+
| date|price|price_or_date|
+----------+-----+-------------+
|2019-01-01| 25| 25|
|2019-01-02| 22| 22|
|2019-01-03| 20| 20|
|2019-01-04| -5| 2019-01-03|
|2019-01-05| -1| 2019-01-03|
|2019-01-06| -2| 2019-01-03|
|2019-01-07| 5| 5|
|2019-01-08| -11| 2019-01-07|
+----------+-----+-------------+
其次,您可以使用date
和此派生列对同一数据集执行left join
。所以,现在price_or_date
列中的价格将显示为null
。最后我们可以对它们执行一个简单的coalesce
。
结合它们,我们可以实现如下所示的最终查询以生成所需的输出,
spark.sql("""
select
a.date
, a.price
, coalesce(b.price, a.price) as new_price
from
(
select *,
case when price < 0 then
max(lag(case when price < 0 then null else date end) over(order by date))
over(order by date rows between unbounded preceding and current row)
else price end as price_or_date
from dataset
) a
left join dataset b
on a.price_or_date = b.date
order by a.date""").show()
输出:
+----------+-----+---------+
| date|price|new_price|
+----------+-----+---------+
|2019-01-01| 25| 25|
|2019-01-02| 22| 22|
|2019-01-03| 20| 20|
|2019-01-04| -5| 20|
|2019-01-05| -1| 20|
|2019-01-06| -2| 20|
|2019-01-07| 5| 5|
|2019-01-08| -11| 5|
+----------+-----+---------+
希望这会有所帮助。
【讨论】:
以上是关于PySpark:如何根据其他行值的值更改行+列的值的主要内容,如果未能解决你的问题,请参考以下文章