用于构建时间线的 Spark 窗口函数
Posted
技术标签:
【中文标题】用于构建时间线的 Spark 窗口函数【英文标题】:Spark Window Function to build timeline 【发布时间】:2022-01-13 08:36:57 【问题描述】:我正在尝试构建时间线,并且我希望能够检测到时间线中断。我有这个测试df:
ID | date |
---|---|
1 | 2012-12-01 |
1 | 2012-12-02 |
1 | 2012-12-03 |
1 | 2012-12-05 |
1 | 2012-12-06 |
1 | 2012-12-07 |
1 | 2012-12-10 |
1 | 2012-12-11 |
我想得到一个开始结束日期的时间表,如下所示:
ID | date | end |
---|---|---|
1 | 2012-12-01 | 2012-12-03 |
1 | 2012-12-05 | 2012-12-07 |
1 | 2012-12-10 | 2012-12-11 |
我一直在尝试:
columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'),
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]
dftest = spark.createDataFrame(data).toDF(*columns)
w1 = Window.partitionBy('id').orderBy(F.col('date'))
df2 = (df1.withColumn("group_date", F.when( ~(F.date_add(F.col('snapshot_date'), -1) == F.lag(F.col("snapshot_date"), 1, 0).over(w1)), F.lit(1)).otherwise(F.lit(0))).filter(F.col('group_date')>1)
但不确定如何获得正确的结束日期
【问题讨论】:
【参考方案1】:这是一个会话化案例,您可以通过this article 了解更多关于使用 spark 进行会话化的信息。
如果我们将上面引用的文章中的带有窗口的解决方案适应您的具体情况,我们会得到以下代码:
from pyspark.sql import functions as F
from pyspark.sql import Window
columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'),
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]
dftest = spark.createDataFrame(data).toDF(*columns)
w1 = Window.partitionBy('id').orderBy('snapshot_date')
df2 = dftest \
.withColumn('session_change', F.when(F.datediff(F.col('snapshot_date'), F.lag('snapshot_date').over(w1)) > 1, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('session_id', F.sum('session_change').over(w1)) \
.groupBy('ID', 'session_id') \
.agg(F.min('snapshot_date').alias('date'), F.max('snapshot_date').alias('end')) \
.drop('session_id')
这将为我们提供以下df2
:
+---+----------+----------+
|ID |date |end |
+---+----------+----------+
|1 |2012-12-01|2012-12-03|
|1 |2012-12-05|2012-12-07|
|1 |2012-12-10|2012-12-11|
+---+----------+----------+
【讨论】:
以上是关于用于构建时间线的 Spark 窗口函数的主要内容,如果未能解决你的问题,请参考以下文章