转发新行填写缺失日期的帐户

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了转发新行填写缺失日期的帐户相关的知识,希望对你有一定的参考价值。

我目前有一个数据集,按变量“聚合器”分组为每小时增量。这个每小时数据中存在间隙,我理想的做法是使用前一行向前填充行,该行映射到列x中的变量。

我已经看到了使用PANDAS的类似问题的一些解决方案,但理想情况下我想了解如何使用pyspark UDF最好地解决这个问题。

我最初考虑过像PANDAS这样的东西,但也努力实现这个只是填写忽略聚合器作为第一遍:

df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')

但理想情况下我想避免使用PANDAS。

在下面的示例中,我有两行每小时数据(标记为MISSING)。

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| MISSING              | MISSING    |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| MISSING              | MISSING    |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

这里的预期输出如下:

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| 2018-12-27T11:00:00Z | A          |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| 2018-12-27T12:00:00Z | B          |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

感谢帮助。

谢谢。

答案

这是解决方案,以填补缺少的时间。使用windows,lag和udf。几乎没有修改,它也可以延长到几天。

from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta

def missing_hours(t1, t2):
    return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

df = spark.read.csv('dates.csv',header=True,inferSchema=True)

window = Window.partitionBy("aggregator").orderBy("timestamp")

df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))
       .filter(col("prev_timestamp").isNotNull())
       .withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))
       .drop("prev_timestamp")

df.union(df_mising).orderBy("aggregator","timestamp").show()

结果

+-------------------+----------+
|          timestamp|aggregator|
+-------------------+----------+
|2018-12-27 09:00:00|         A|
|2018-12-27 10:00:00|         A|
|2018-12-27 11:00:00|         A|
|2018-12-27 12:00:00|         A|
|2018-12-27 13:00:00|         A|
|2018-12-27 09:00:00|         B|
|2018-12-27 10:00:00|         B|
|2018-12-27 11:00:00|         B|
|2018-12-27 12:00:00|         B|
|2018-12-27 13:00:00|         B|
|2018-12-27 14:00:00|         B|
+-------------------+----------+

以上是关于转发新行填写缺失日期的帐户的主要内容,如果未能解决你的问题,请参考以下文章

填写缺失的日期值并根据前一行填充第二列

ORACLE SQL:填写缺失的日期

如何在 google BigQuery 中填写缺失的日期

根据 max 和 min 填写缺失的日期 pandas

根据 max 和 min 填写缺失的日期 pandas

填写运行总计的缺失日期