如何将免费日期范围添加到 pyspark df

Posted

技术标签:

【中文标题】如何将免费日期范围添加到 pyspark df【英文标题】:how to add complimentary date ranges to a pyspark df 【发布时间】:2020-08-30 20:51:06 【问题描述】:

我是新来的火花和挣扎这个:

从一个包含用户 ID 和日期的表中,我得到了这个 df

+-------+--------+----------+----------+----+
|user_id|subgroup| from_date|   to_date|days|active
+-------+--------+----------+----------+----+
|6651481|       0|2018-08-26|2020-01-05| 498|1
|6651481|       1|2020-01-10|2020-02-17|  39|1
|6651481|       2|2020-02-19|2020-03-06|  17|1

我想将“不活动”的范围添加到活动 DF 中

+-------+--------+----------+----------+----+
|user_id|subgroup| from_date|   to_date|days|active
+-------+--------+----------+----------+----+
|6651481|       0|2018-08-26|2020-01-05| 498|1
|6651481|        |2020-01-06|2020-01-09|   3|0
|6651481|       1|2020-01-10|2020-02-17|  39|1
|6651481|       2|2020-02-19|2020-03-06|  17|1

感谢您的帮助!

【问题讨论】:

***.com/questions/41711716/… 【参考方案1】:
from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as _sum, min as _min, max as _max
from pyspark.sql.window import Window
df = spark.read.format('delta').load(path)
df = df.dropDuplicates()
w = Window.partitionBy("user_id").orderBy("activity_date")
diff = coalesce(datediff("activity_date", lag("activity_date", 1).over(w)), lit(0))
indicator = (diff > 1).cast("integer")
subgroup = _sum(indicator).over(w).alias("subgroup")
df =df.withColumn('ones',lit(1))
df_activity = df.select("*", subgroup).groupBy("user_id", "subgroup").agg(_min('activity_date').alias("from_date"),_max('activity_date').alias('to_date'),_sum('ones').alias('days'))
df_activity = df_activity.drop('subgroup')

w2 = Window.partitionBy("user_id").orderBy("from_date")
df_wo_activity = df_activity.select("*",lag("to_date",1).over(w2).alias('wo_from_date'))
import pyspark.sql.functions as F
df_wo_activity = df_wo_activity.withColumn('wo_from_date',F.date_add(df_wo_activity.wo_from_date,1))
df_wo_activity = df_wo_activity.withColumn('wo_to_date',F.date_add(df_wo_activity.from_date,-1))
df_wo_activity = df_wo_activity.withColumn('wo_days',F.datediff(df_wo_activity['wo_to_date'],df_wo_activity['wo_from_date'])+1)
df_wo_activity = df_wo_activity.drop('subgroup')
df_wo_activity = df_wo_activity.select('user_id','wo_from_date','wo_to_date','wo_days')
df_wo_activity = df_wo_activity.withColumn('is_active',lit(0))
df_activity = df_activity.withColumn('is_active',lit(1))
df_all = df_activity.union(df_wo_activity)
df_all = df_all.sort('user_id','from_date')
df_all.where("from_date is not null").write.format("delta").mode('append').save(write_path)

【讨论】:

以上是关于如何将免费日期范围添加到 pyspark df的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:在日期为字符串的范围内按日期字段过滤DataFrame

在 PySpark 中为镶木地板文件过滤日期时间范围和时区

如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

将名称添加到日期范围搜索。微软访问 VBA

Google电子表格:将日期添加到由inputBox设置的日期,填充范围

如何使用 Nest 客户端将日期范围应用于聚合查询