填写两组缺失的日期并将数据转换为 Spark dataFrame 中的每周
Posted
技术标签:
【中文标题】填写两组缺失的日期并将数据转换为 Spark dataFrame 中的每周【英文标题】:Fill missing dates in two groups and convert data to weekly in Spark dataFrame 【发布时间】:2022-01-21 14:32:12 【问题描述】:我有这个数据框,其中有很多缺失的日期
df = pd.DataFrame('date':['2021-12-1','2021-12-2','2021-12-21','2021-12-1','2021-12-7','2021-12-1','2021-12-5','2021-12-1','2021-12-5'],
'id1':['a1','a1','a1','a1','a1','a2','a2','a2','a2'],
'id2':['b1','b1','b1','b2','b2','b3','b3','b4','b4'],
'value1':[1,5,7,2,9,3,0,1,7],
'value2':[6,2,8,1,9,3,0,2,6])
看起来像这样
date id1 id2 value1 value2
0 2021-12-1 a1 b1 1 6
1 2021-12-2 a1 b1 5 2
2 2021-12-21 a1 b1 7 8
3 2021-12-1 a1 b2 2 1
4 2021-12-7 a1 b2 9 9
5 2021-12-1 a2 b3 3 3
6 2021-12-5 a2 b3 0 0
7 2021-12-1 a2 b4 1 2
8 2021-12-5 a2 b4 7 6
我希望我的输出看起来像这样,频率从每天更改为每周,并且每周从星期一开始。
id1 id2 date value1 value2
0 a1 b1 2021-12-06 6 8
1 a1 b1 2021-12-13 0 0
2 a1 b1 2021-12-20 0 0
3 a1 b1 2021-12-27 7 8
4 a1 b2 2021-12-06 2 1
5 a1 b2 2021-12-13 9 9
6 a2 b3 2021-12-06 3 3
7 a2 b4 2021-12-06 8 8
我已经在 pandas 中完成了编码
首先,我用零值填充缺失的日期,然后在第二步中使用resample
将每日数据转换为每周数据。我在这里使用W-Mon
,这意味着我从星期一开始我的一周。
#Filling missing dates values with zero
df['date'] = pd.to_datetime(df['date'])
df = (df.set_index('date')
.groupby(['id1','id2'])['value1','value2']
.apply(lambda x: x.asfreq('d', fill_value=0))
.reset_index()
[['date','id1','id2','value1','value2']])
#convert to weekly data and set monday as starting day for each week
df = (df.groupby(['id1','id2'])
.resample('W-Mon', label='right', closed = 'left', on='date')
.agg('value1':'sum',"value2":'sum' )
.reset_index())
我正在尝试将我的代码转换为我已经通过 this 的 spark 有没有更简单的方法?
【问题讨论】:
我已经根据你的 pandas 代码编辑了预期的输出 我已经编辑了。请更新熊猫版本,您也将获得与我相同的输出。 Resample 仅适用于最新的 pandas 版本。 【参考方案1】:这样就可以了,代码非常简单,但如果有疑问,请检查 spark 文档中的函数
df = df.withColumn('Date', F.next_day('Date','Mon'))
df = df.groupby((['id1','id2','Date'])).agg(*[F.sum(c).alias(c) for c in ['value1', 'value2']])
new_dts = df.groupby(['id1','id2']).agg(
F.array_except(
F.expr('sequence(min(Date), max(Date), interval 1 week)'),
F.collect_set('Date'),
).name('Date')
)
new_dts = new_dts.withColumn('Date', F.explode('Date'))
df = df.union(new_dts).na.fill('0')
df.show()
+---+---+----------+------+------+
|id1|id2| Date|value1|value2|
+---+---+----------+------+------+
| a1| b2|2021-12-06| 2| 1|
| a1| b1|2021-12-27| 7| 8|
| a1| b1|2021-12-06| 6| 8|
| a2| b4|2021-12-06| 8| 8|
| a2| b3|2021-12-06| 3| 3|
| a1| b2|2021-12-13| 9| 9|
| a1| b1|2021-12-13| 0| 0|
| a1| b1|2021-12-20| 0| 0|
+---+---+----------+------+------+
您可能需要考虑您当前正在将日期与下周的星期一对齐。要将您的日期与同一周的星期一对齐,而不是这样做
F.date_sub(F.next_day('Date','Mon'), 7)
【讨论】:
您错过了对 value2 值求和。【参考方案2】:试试这个。
创建一个 tmp
数据框,该数据框具有从下周一开始的日期序列,从日期列的最小值开始,间隔 7 天。然后将其与主数据框连接,然后根据周数之间的差异进行操作:
from pyspark.sql import functions as F
df = df.withColumn("date",F.to_date("date"))
tmp = (df.groupBy("id1","id2").agg(F.min("date").alias("Mindate")
,F.max("date").alias("Maxdate"))
.withColumn("MinMonday",F.next_day("Mindate","Mon"))
.withColumn("MaxMonday",F.next_day("Maxdate","Mon"))
.withColumn("Seq",
F.explode(F.expr("sequence(MinMonday,MaxMonday,interval 7 day)")))
.drop("Mindate","Maxdate","MinMonday","MaxMonday"))
def maskedvalue(col) : return f"""CASE WHEN weekdiff <=1 THEN col ELSE 0 END"""
out = (df.alias("left").join(tmp.alias("right"),
on=[df['id1']==tmp['id1'],df['id2']==tmp['id2'],df['date']<=tmp['Seq']])
.select("date","left.id1","left.id2","Seq","value1","value2")
.withColumn("weekdiff",F.weekofyear("Seq")-F.weekofyear("date"))
.withColumn("value1",F.expr(maskedvalue(("value1"))))
.withColumn("value2",F.expr(maskedvalue(("value2"))))
.groupBy("id1","id2","Seq").agg(F.sum("value1").alias("value1")
,F.sum("value2").alias("value2"))
.withColumnRenamed("Seq","Date")
)
out.orderBy("id1","id2","Date").show()
+---+---+----------+------+------+
|id1|id2| Date|value1|value2|
+---+---+----------+------+------+
| a1| b1|2021-12-06| 6| 8|
| a1| b1|2021-12-13| 0| 0|
| a1| b1|2021-12-20| 0| 0|
| a1| b1|2021-12-27| 7| 8|
| a1| b2|2021-12-06| 2| 1|
| a1| b2|2021-12-13| 9| 9|
| a2| b3|2021-12-06| 3| 3|
| a2| b4|2021-12-06| 8| 8|
+---+---+----------+------+------+
请注意,tmp 数据框如下所示:
+---+---+----------+
|id1|id2| Seq|
+---+---+----------+
| a1| b1|2021-12-06|
| a1| b1|2021-12-13|
| a1| b1|2021-12-20|
| a1| b1|2021-12-27|
| a1| b2|2021-12-06|
| a1| b2|2021-12-13|
| a2| b3|2021-12-06|
| a2| b4|2021-12-06|
+---+---+----------+
【讨论】:
以上是关于填写两组缺失的日期并将数据转换为 Spark dataFrame 中的每周的主要内容,如果未能解决你的问题,请参考以下文章