PySpark 条件增量

Posted

技术标签:

【中文标题】PySpark 条件增量【英文标题】:PySpark conditional increment 【发布时间】:2016-07-31 21:17:10 【问题描述】:

我是 PySpark 的新手,我正在尝试转换一些派生新变量“COUNT_IDX”的 Python 代码。新变量的初始值为 1,但在满足条件时增加 1。否则,新变量值将与上一条记录中的值相同。

增加的条件是: TRIP_CD 不等于上一条记录 TRIP_CD SIGN 不等于之前的记录 SIGN time_diff 不等于 1。

Python 代码(熊猫数据框):

df['COUNT_IDX'] = 1

for i in range(1, len(df)):
    if ((df['TRIP_CD'].iloc[i] != df['TRIP_CD'].iloc[i - 1])
          or (df['SIGN'].iloc[i] != df['SIGN'].iloc[i-1])
          or df['time_diff'].iloc[i] != 1):
        df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] + 1
    else:
        df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1]

这是预期的结果:

TRIP_CD   SIGN   time_diff  COUNT_IDX
2711      -      1          1
2711      -      1          1
2711      +      2          2
2711      -      1          3
2711      -      1          3
2854      -      1          4
2854      +      1          5

在 PySpark 中,我将 COUNT_IDX 初始化为 1。然后使用 Window 函数,我获取了 TRIP_CD 和 SIGN 的滞后并计算了 time_diff,然后尝试:

df = sqlContext.sql('''
   select TRIP, TRIP_CD, SIGN, TIME_STAMP, seconds_diff,
   case when TRIP_CD != TRIP_lag or SIGN != SIGN_lag  or  seconds_diff != 1 
        then (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))+1
        else (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP)) 
        end as COUNT_INDEX from df''')

这给了我类似的东西:

TRIP_CD   SIGN   time_diff  COUNT_IDX
2711      -      1          1
2711      -      1          1
2711      +      2          2
2711      -      1          2
2711      -      1          1
2854      -      1          2
2854      +      1          2

如果 COUNT_IDX 在先前记录上更新,则当前记录上的 COUNT_IDX 无法识别要计算的更改。就像 COUNTI_IDX 没有被覆盖或者没有被逐行评估。关于如何解决这个问题的任何想法?

【问题讨论】:

【参考方案1】:

这里需要累积和:

-- cumulative sum
SUM(CAST(  
  -- if at least one condition has been satisfied
  -- we take 1 otherwise 0
  TRIP_CD != TRIP_lag OR SIGN != SIGN_lag OR seconds_diff != 1 AS LONG
)) OVER W
...
WINDOW W AS (PARTITION BY trip ORDER BY times_stamp)

【讨论】:

这是一个创造性的解决方案,但是,我还没有完全实现它。您是将其放在 withColumn 语句中以创建具有累积总和的新列,还是应该在 SQL 中?谢谢! 这旨在替换case whenend 之间的SQL 查询。如果您愿意,可以内联窗口定义。由于数据中缺少一些列,因此您显示它只是伪代码。

以上是关于PySpark 条件增量的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark地图中添加增量数字

增量文件版本的 Pyspark 结构化流错误

将增量文件写入 S3 (MinIO) - PySpark 2.4.3

无法使用 Pyspark 中的更新将字符串插入增量表

PySpark。如何确保每日增量数据在 HIVE 中没有重复的 UUID 作为 PK

在 Pyspark Interactive Shell 中查找列的增量