滚动计数器 24 小时时间戳 - pyspark
Posted
技术标签:
【中文标题】滚动计数器 24 小时时间戳 - pyspark【英文标题】:Rolling counter for 24 hours timestamp - pyspark 【发布时间】:2021-06-07 14:22:04 【问题描述】:以下是输入数据框:
id,timestamp
1,1/10/18 17:57
1,1/12/18 13:29
1,2/1/18 11:04
1,2/14/18 10:53
1,3/7/18 11:16
1,3/19/18 8:45
1,3/19/18 12:44
1,3/30/18 23:45
1,4/15/18 19:54
1,4/15/18 19:54
1,4/15/18 19:54
1,7/9/18 19:26
1,7/9/18 19:26
1,7/10/18 6:03
1,7/10/18 9:03
如果前一行在 24 小时时间戳内,我需要创建另一列“计数器”,计数器会增加。如果不在 24 小时内,计数器应重置为 0。对于上述输入,所需的输出是:
id,timestamp,counter
1,1/10/18 17:57,0
1,1/12/18 13:29,0
1,2/1/18 11:04,0
1,2/14/18 10:53,0
1,3/7/18 11:16,0
1,3/19/18 8:45,0
1,3/19/18 12:44,1
1,3/30/18 23:45,0
1,4/15/18 19:54,0
1,4/15/18 19:54,1
1,4/15/18 19:54,2
1,7/9/18 19:26,0
1,7/9/18 19:26,1
1,7/10/18 6:03,2
1,7/10/18 9:03,3
我已经能够使用以下代码解决问题,其中:
-
我首先使用滞后来查找以前的值。
计算小时差并检查是否在 24 小时内并创建一个标志。
如果标志为 1,则使用滞后时间戳中的 to_date,否则使用 to_date(timestamp)。
使用 row_number 和按 id、flag 分区来获取计数器值。
除了输出中的最后一行之外,逻辑工作正常,这是因为我从滞后中选择 to_date,这似乎不正确。我假设我需要对数据框进行分组,但我无法继续。任何帮助表示赞赏。
test_1=df.withColumn("timestamp",to_timestamp(col("timestamp"),'MM/dd/yy HH:mm'))
newdf = test_1.withColumn("lag",lag(col("timestamp"),1).over(Window.partitionBy("id").orderBy("timestamp")))
df2 = (
newdf.withColumn('lag',col('lag'))\
.withColumn('timestamp',col('timestamp'))\
.withColumn('DiffInSeconds',col('timestamp').cast("long")-col("lag").cast("long")))
df3 = df2.withColumn('DiffInHours',round(col('DiffInSeconds')/3600)).drop("DiffInSeconds")
df5 = df3.withColumn("flaglag", when((col("DiffInHours") >= 0 ) & (col("DiffInHours") <= 24 ),0).otherwise(1))
df6=df5.withColumn("to_date", when(col("flaglag") == 1,to_date('timestamp')).otherwise(to_date('lag')))
nw_win=Window.partitionBy("id","to_date").orderBy("timestamp")
final_df=df6.withColumn("counter",row_number().over(nw_win)-1)
【问题讨论】:
不是答案,但您可以对转换进行排序。这样您就不必为每个转换创建一个新变量。当然,除非有特定原因要跟踪中间数据帧,否则可以尝试以下操作:transformed_df = df.withColumn(...).withColumn(...)...
【参考方案1】:
我可以通过创建组号列来解决这个问题。
tmp_join = final_df.withColumn("group", sum(when(col("flaglag") == 0,0).otherwise(1)).over(Window.partitionBy("id").orderBy("timestamp")))
final_new_df = tmp_join.withColumn("C", row_number().over(Window.partitionBy("id","group").orderBy("timestamp"))-1)
【讨论】:
以上是关于滚动计数器 24 小时时间戳 - pyspark的主要内容,如果未能解决你的问题,请参考以下文章