填写两组缺失的日期并将数据转换为 Spark dataFrame 中的每周

Posted

技术标签:

【中文标题】填写两组缺失的日期并将数据转换为 Spark dataFrame 中的每周【英文标题】:Fill missing dates in two groups and convert data to weekly in Spark dataFrame 【发布时间】:2022-01-21 14:32:12 【问题描述】:

我有这个数据框,其中有很多缺失的日期

df = pd.DataFrame('date':['2021-12-1','2021-12-2','2021-12-21','2021-12-1','2021-12-7','2021-12-1','2021-12-5','2021-12-1','2021-12-5'],
                   'id1':['a1','a1','a1','a1','a1','a2','a2','a2','a2'],
                   'id2':['b1','b1','b1','b2','b2','b3','b3','b4','b4'],
                   'value1':[1,5,7,2,9,3,0,1,7],
                   'value2':[6,2,8,1,9,3,0,2,6])

看起来像这样

         date id1 id2  value1  value2
0   2021-12-1  a1  b1       1       6
1   2021-12-2  a1  b1       5       2
2  2021-12-21  a1  b1       7       8
3   2021-12-1  a1  b2       2       1
4   2021-12-7  a1  b2       9       9
5   2021-12-1  a2  b3       3       3
6   2021-12-5  a2  b3       0       0
7   2021-12-1  a2  b4       1       2
8   2021-12-5  a2  b4       7       6

我希望我的输出看起来像这样,频率从每天更改为每周,并且每周从星期一开始。

  id1 id2       date  value1  value2
0  a1  b1 2021-12-06       6       8
1  a1  b1 2021-12-13       0       0
2  a1  b1 2021-12-20       0       0
3  a1  b1 2021-12-27       7       8
4  a1  b2 2021-12-06       2       1
5  a1  b2 2021-12-13       9       9
6  a2  b3 2021-12-06       3       3
7  a2  b4 2021-12-06       8       8

我已经在 pandas 中完成了编码 首先,我用零值填充缺失的日期,然后在第二步中使用resample 将每日数据转换为每周数据。我在这里使用W-Mon,这意味着我从星期一开始我的一周。

#Filling missing dates values with zero
df['date'] = pd.to_datetime(df['date'])
df = (df.set_index('date')
      .groupby(['id1','id2'])['value1','value2']
      .apply(lambda x: x.asfreq('d', fill_value=0))
      .reset_index()
      [['date','id1','id2','value1','value2']])
#convert to weekly data and set monday as starting day for each week
df = (df.groupby(['id1','id2'])
       .resample('W-Mon', label='right', closed = 'left', on='date')
       .agg('value1':'sum',"value2":'sum' )
       .reset_index())

我正在尝试将我的代码转换为我已经通过 this 的 spark 有没有更简单的方法?

【问题讨论】:

我已经根据你的 pandas 代码编辑了预期的输出 我已经编辑了。请更新熊猫版本,您也将获得与我相同的输出。 Resample 仅适用于最新的 pandas 版本。 【参考方案1】:

这样就可以了,代码非常简单,但如果有疑问,请检查 spark 文档中的函数

df = df.withColumn('Date', F.next_day('Date','Mon'))

df = df.groupby((['id1','id2','Date'])).agg(*[F.sum(c).alias(c) for c in ['value1', 'value2']])

new_dts = df.groupby(['id1','id2']).agg(
    F.array_except(
        F.expr('sequence(min(Date), max(Date), interval 1 week)'),
        F.collect_set('Date'),
    ).name('Date')
)


new_dts = new_dts.withColumn('Date', F.explode('Date'))
df = df.union(new_dts).na.fill('0')
df.show()
+---+---+----------+------+------+
|id1|id2|      Date|value1|value2|
+---+---+----------+------+------+
| a1| b2|2021-12-06|     2|     1|
| a1| b1|2021-12-27|     7|     8|
| a1| b1|2021-12-06|     6|     8|
| a2| b4|2021-12-06|     8|     8|
| a2| b3|2021-12-06|     3|     3|
| a1| b2|2021-12-13|     9|     9|
| a1| b1|2021-12-13|     0|     0|
| a1| b1|2021-12-20|     0|     0|
+---+---+----------+------+------+

您可能需要考虑您当前正在将日期与下周的星期一对齐。要将您的日期与同一周的星期一对齐,而不是这样做

F.date_sub(F.next_day('Date','Mon'), 7)

【讨论】:

您错过了对 value2 值求和。【参考方案2】:

试试这个。

创建一个 tmp 数据框,该数据框具有从下周一开始的日期序列,从日期列的最小值开始,间隔 7 天。然后将其与主数据框连接,然后根据周数之间的差异进行操作:

from pyspark.sql import functions as F

df = df.withColumn("date",F.to_date("date"))
tmp = (df.groupBy("id1","id2").agg(F.min("date").alias("Mindate")
                                   ,F.max("date").alias("Maxdate"))
         .withColumn("MinMonday",F.next_day("Mindate","Mon"))
         .withColumn("MaxMonday",F.next_day("Maxdate","Mon"))
         .withColumn("Seq",
          F.explode(F.expr("sequence(MinMonday,MaxMonday,interval 7 day)")))
         .drop("Mindate","Maxdate","MinMonday","MaxMonday"))


def maskedvalue(col) : return f"""CASE WHEN weekdiff <=1 THEN col ELSE 0 END"""
out = (df.alias("left").join(tmp.alias("right"),
             on=[df['id1']==tmp['id1'],df['id2']==tmp['id2'],df['date']<=tmp['Seq']])
.select("date","left.id1","left.id2","Seq","value1","value2")
.withColumn("weekdiff",F.weekofyear("Seq")-F.weekofyear("date"))
.withColumn("value1",F.expr(maskedvalue(("value1"))))
.withColumn("value2",F.expr(maskedvalue(("value2"))))
.groupBy("id1","id2","Seq").agg(F.sum("value1").alias("value1")
                                ,F.sum("value2").alias("value2"))
.withColumnRenamed("Seq","Date")
)

out.orderBy("id1","id2","Date").show()


+---+---+----------+------+------+
|id1|id2|      Date|value1|value2|
+---+---+----------+------+------+
| a1| b1|2021-12-06|     6|     8|
| a1| b1|2021-12-13|     0|     0|
| a1| b1|2021-12-20|     0|     0|
| a1| b1|2021-12-27|     7|     8|
| a1| b2|2021-12-06|     2|     1|
| a1| b2|2021-12-13|     9|     9|
| a2| b3|2021-12-06|     3|     3|
| a2| b4|2021-12-06|     8|     8|
+---+---+----------+------+------+

请注意,tmp 数据框如下所示:

+---+---+----------+
|id1|id2|       Seq|
+---+---+----------+
| a1| b1|2021-12-06|
| a1| b1|2021-12-13|
| a1| b1|2021-12-20|
| a1| b1|2021-12-27|
| a1| b2|2021-12-06|
| a1| b2|2021-12-13|
| a2| b3|2021-12-06|
| a2| b4|2021-12-06|
+---+---+----------+

【讨论】:

以上是关于填写两组缺失的日期并将数据转换为 Spark dataFrame 中的每周的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中重新索引和填充缺失的日期

转发新行填写缺失日期的帐户

使用带有多个键的 Grouper 时填写缺失的日期

Spark使用类将rdd转换为数据框

根据 max 和 min 填写缺失的日期 pandas

根据 max 和 min 填写缺失的日期 pandas