连续行之间的日期差异 - Pyspark Dataframe

Posted

技术标签:

【中文标题】连续行之间的日期差异 - Pyspark Dataframe【英文标题】:Date difference between consecutive rows - Pyspark Dataframe 【发布时间】:2016-07-02 04:04:50 【问题描述】:

我有一个结构如下的表

USER_ID     Tweet_ID                 Date
  1           1001       Thu Aug 05 19:11:39 +0000 2010
  1           6022       Mon Aug 09 17:51:19 +0000 2010
  1           1041       Sun Aug 19 11:10:09 +0000 2010
  2           9483       Mon Jan 11 10:51:23 +0000 2012
  2           4532       Fri May 21 11:11:11 +0000 2012
  3           4374       Sat Jul 10 03:21:23 +0000 2013
  3           4334       Sun Jul 11 04:53:13 +0000 2013

基本上我想做的是有一个 PysparkSQL 查询来计算具有相同 user_id 编号的连续记录的日期差异(以秒为单位)。预期的结果是:

1      Sun Aug 19 11:10:09 +0000 2010 - Mon Aug 09 17:51:19 +0000 2010     839930
1      Mon Aug 09 17:51:19 +0000 2010 - Thu Aug 05 19:11:39 +0000 2010     340780
2      Fri May 21 11:11:11 +0000 2012 - Mon Jan 11 10:51:23 +0000 2012     1813212
3      Sun Jul 11 04:53:13 +0000 2013 - Sat Jul 10 03:21:23 +0000 2013     5510

【问题讨论】:

【参考方案1】:

另一种可能是:

from pyspark.sql.functions import lag
from pyspark.sql.window import Window

df.withColumn("time_intertweet",(df.date.cast("bigint") - lag(df.date.cast("bigint"), 1)
.over(Window.partitionBy("user_‌​id")
.orderBy("date")‌​))
.cast("bigint"))

【讨论】:

【参考方案2】:

已编辑感谢@cool_kid

@Joesemy 的回答非常好,但对我不起作用,因为 cast("bigint") 引发了错误。所以我以这种方式使用了 pyspark.sql.functions 模块 中的datediff 函数,它起作用了:

from pyspark.sql.functions import *
from pyspark.sql.window import Window

df.withColumn("time_intertweet", datediff(df.date, lag(df.date, 1)
    .over(Window.partitionBy("user_‌​id")
    .orderBy("date")‌​)))

【讨论】:

这在将 '-' 更改为 ',' -> datediff(df.date , lag(df.date, 1) 并导入 -“从 pyspark.sql 导入函数为 f”和“从 pyspark.sql.window 导入窗口”并更新了 f.datediff 和 f.lag :) 谢谢 LePuppy! 这不会按要求计算日期差异以秒为单位,而是以天为单位:spark.apache.org/docs/2.1.0/api/python/…【参考方案3】:

像这样:

df.registerTempTable("df")

sqlContext.sql("""
     SELECT *, CAST(date AS bigint) - CAST(lag(date, 1) OVER (
              PARTITION BY user_id ORDER BY date) AS bigint) 
     FROM df""")

【讨论】:

另一种方式:df.withColumn("time_intertweet",(df.date.cast("bigint") - lag(df.date.cast("bigint"),1).over(Window .partitionBy("user_id").orderBy("date"))).cast("bigint")) @Joss 您能否将其添加到答案中?我把它转换成维基。谢谢。

以上是关于连续行之间的日期差异 - Pyspark Dataframe的主要内容,如果未能解决你的问题,请参考以下文章

如何根据条件在pyspark中跨连续行保留值

PySpark SQL 中的日期之间的差异

Pyspark:两个日期之间的差异(Cast TimestampType,Datediff)

同一column_python中同一数据帧中两个连续行之间的相关性

添加一列,这是熊猫中连续行差异的结果

计算Hive中计数器数据的差异