如何将免费日期范围添加到 pyspark df
Posted
技术标签:
【中文标题】如何将免费日期范围添加到 pyspark df【英文标题】:how to add complimentary date ranges to a pyspark df 【发布时间】:2020-08-30 20:51:06 【问题描述】:我是新来的火花和挣扎这个:
从一个包含用户 ID 和日期的表中,我得到了这个 df
+-------+--------+----------+----------+----+
|user_id|subgroup| from_date| to_date|days|active
+-------+--------+----------+----------+----+
|6651481| 0|2018-08-26|2020-01-05| 498|1
|6651481| 1|2020-01-10|2020-02-17| 39|1
|6651481| 2|2020-02-19|2020-03-06| 17|1
我想将“不活动”的范围添加到活动 DF 中
+-------+--------+----------+----------+----+
|user_id|subgroup| from_date| to_date|days|active
+-------+--------+----------+----------+----+
|6651481| 0|2018-08-26|2020-01-05| 498|1
|6651481| |2020-01-06|2020-01-09| 3|0
|6651481| 1|2020-01-10|2020-02-17| 39|1
|6651481| 2|2020-02-19|2020-03-06| 17|1
感谢您的帮助!
【问题讨论】:
***.com/questions/41711716/… 【参考方案1】:from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as _sum, min as _min, max as _max
from pyspark.sql.window import Window
df = spark.read.format('delta').load(path)
df = df.dropDuplicates()
w = Window.partitionBy("user_id").orderBy("activity_date")
diff = coalesce(datediff("activity_date", lag("activity_date", 1).over(w)), lit(0))
indicator = (diff > 1).cast("integer")
subgroup = _sum(indicator).over(w).alias("subgroup")
df =df.withColumn('ones',lit(1))
df_activity = df.select("*", subgroup).groupBy("user_id", "subgroup").agg(_min('activity_date').alias("from_date"),_max('activity_date').alias('to_date'),_sum('ones').alias('days'))
df_activity = df_activity.drop('subgroup')
w2 = Window.partitionBy("user_id").orderBy("from_date")
df_wo_activity = df_activity.select("*",lag("to_date",1).over(w2).alias('wo_from_date'))
import pyspark.sql.functions as F
df_wo_activity = df_wo_activity.withColumn('wo_from_date',F.date_add(df_wo_activity.wo_from_date,1))
df_wo_activity = df_wo_activity.withColumn('wo_to_date',F.date_add(df_wo_activity.from_date,-1))
df_wo_activity = df_wo_activity.withColumn('wo_days',F.datediff(df_wo_activity['wo_to_date'],df_wo_activity['wo_from_date'])+1)
df_wo_activity = df_wo_activity.drop('subgroup')
df_wo_activity = df_wo_activity.select('user_id','wo_from_date','wo_to_date','wo_days')
df_wo_activity = df_wo_activity.withColumn('is_active',lit(0))
df_activity = df_activity.withColumn('is_active',lit(1))
df_all = df_activity.union(df_wo_activity)
df_all = df_all.sort('user_id','from_date')
df_all.where("from_date is not null").write.format("delta").mode('append').save(write_path)
【讨论】:
以上是关于如何将免费日期范围添加到 pyspark df的主要内容,如果未能解决你的问题,请参考以下文章
PySpark:在日期为字符串的范围内按日期字段过滤DataFrame
如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?