如何在pySpark中有条件地替换值并将替换后的值用于下一个条件

Posted

技术标签:

【中文标题】如何在pySpark中有条件地替换值并将替换后的值用于下一个条件【英文标题】:How to replace value conditionally in pySpark and use the replaced value for the next condition 【发布时间】:2018-10-23 09:06:22 【问题描述】:

首先,希望我的问题格式正确。 我有这个数据框:

df = sc.parallelize([
('1112', 1, 0, 1, '2018-05-01'),
('1111', 1, 1, 1, '2018-05-01'),
('1111', 1, 3, 2, '2018-05-04'),
('1111', 1, 1, 2, '2018-05-05'),
('1111', 1, 1, 2, '2018-05-06'),
]).toDF(["customer_id", "buy_count", "date_difference", "expected_answer", "date"]).cache()

df.show()
+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer|      date|
+-----------+---------+---------------+---------------+----------+
|       1111|        1|              1|              1|2018-05-01|
|       1111|        1|              3|              2|2018-05-04|
|       1111|        1|              1|              2|2018-05-05|
|       1111|        1|              1|              2|2018-05-06|
|       1112|        1|              0|              1|2018-05-01|
+-----------+---------+---------------+---------------+----------+

我想创建“expected_answer”列:

如果客户超过 3 天没有购买(date_difference >=3),我想将他的 buy_count 增加 1。之后的每次购买都需要有新的 buy_count,除非他不再购买 3在这种情况下,buy_count 将再次增加。

这是我的代码以及我已经完成了多少。问题似乎是 spark 实际上并没有估算值,而是创建了一个新列。有没有办法克服这个问题?我也尝试过使用 Hive,结果完全相同。

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import when

windowSpec = func.lag(df['buy_count']).\
over(Window.partitionBy(df['customer_id']).\
orderBy(df['date'].asc()))

df.withColumn('buy_count', \
              when(df['date_difference'] >=3, windowSpec +1).when(windowSpec.isNull(), 1)\
              .otherwise(windowSpec)).show()

+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer|      date|
+-----------+---------+---------------+---------------+----------+
|       1112|        1|              0|              1|2018-05-01|
|       1111|        1|              1|              1|2018-05-01|
|       1111|        2|              3|              2|2018-05-04|
|       1111|        1|              1|              2|2018-05-05|
|       1111|        1|              1|              2|2018-05-06|
+-----------+---------+---------------+---------------+----------+

我怎样才能得到预期的结果?提前致谢。

【问题讨论】:

Spark - Window with recursion? - Conditionally propagating values across rows的可能重复 您还可以查看Spark SQL window function with complex condition,它显示了与日期相同的模式,但在 Scala 中。 【参考方案1】:

终于想通了。感谢大家指出类似案例。

我的印象是 SUM() over Partition 将对整个分区求和,而不仅仅是对当前行之前的所有内容求和。幸运的是,我能够用一个非常简单的 SQL 解决我的问题:

SELECT SUM(CASE WHEN(date_difference>=3) THEN 1 ELSE 0 END) OVER (PARTITION BY customer_id ORDER BY date) 
       FROM df

sqlContext.sql(qry).show()

【讨论】:

以上是关于如何在pySpark中有条件地替换值并将替换后的值用于下一个条件的主要内容,如果未能解决你的问题,请参考以下文章

使用 dplyr [重复] 有条件地将一列中的值替换为另一列中的值

有条件地替换 SELECT 中的值

PySpark 将低于计数阈值的值替换为值

Pyspark:用字典中的值替换列的值

如何在 pySpark 中有效地从字符串数据框中替换多个正则表达式模式的所有实例?

有没有办法在 SCSS 中有条件地替换全局变量?