pyspark:在日期和时间上重新采样 pyspark 数据帧

Posted

技术标签:

【中文标题】pyspark:在日期和时间上重新采样 pyspark 数据帧【英文标题】:pyspark: resampling pyspark dataframe on date and time 【发布时间】:2020-06-28 13:27:27 【问题描述】:

如何重新采样 pyspark 数据帧,就像在 pandas 中我们有 pd.grouper 和 pd.resample 一样,我可以在 h、2h、3h、week 上重新采样。我有以下示例 pyspark 数据框,我如何在列 inddate 以及 every h/2h/3h

上聚合它>
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

a = sqlContext.createDataFrame([["Anand", "2020-02-01 16:00:00", 12, "ba"],
                         ["Anand", "2020-02-01 16:05:00", 7, "ba" ]
                        ["Anand", "2020-02-02 19:10:00", 14,"sa"], 
                        ["Carl", "2020-02-01 16:00:00", 16,"da"], 
                        ["Carl", "2020-02-02 16:02:00", 12,"ga"],
                        ["Carl", "2020-02-02 17:10:00", 1,"ga"],
                        ["Eric", "2020-02-01 16:o0:00", 24, "sa"]], ['ind',"date","sal","imp"])
a.show()

|  ind|               date|sal|imp|
+-----+-------------------+---+---+
|Anand|2020-02-01 16:00:00| 12| ba|
|Anand|2020-02-01 16:05:00|  7| sa|
|Anand|2020-02-02 19:10:00| 14| sa|
| Carl|2020-02-01 16:00:00| 16| da|
| Carl|2020-02-01 16:02:00| 12| ga|
| Carl|2020-02-02 17:10:00|  1| ga|
| Eric|2020-02-01 16:00:00| 24| sa|

因此,当聚合列 ind 并在 date(everyhour)meansale 上重新采样时,期望输出可能看起来像

|  ind|               date|sal|
+-----+-------------------+---+
|Anand|2020-02-01 16:00:00|  9|
|Anand|2020-02-02 19:00:00| 14|
| Carl|2020-02-01 16:00:00|  9|
| Carl|2020-02-02 17:00:00|  1|
| Eric|2020-02-01 16:00:00| 24|

【问题讨论】:

【参考方案1】:

您可以完全按照您在问题中描述的方式进行操作:按inddate 分组。在date_trunc 的帮助下,我们可以在分组前将日期列四舍五入:

from pyspark.sql import functions as F
a.groupBy('ind', F.date_trunc('hour', F.col('date')).alias('date'))\
   .agg(F.mean('sal')) \
   .orderBy('ind', 'date') \
   .show()

打印

+-----+-------------------+--------+
|  ind|               date|avg(sal)|
+-----+-------------------+--------+
|Anand|2020-02-01 16:00:00|     9.5|
|Anand|2020-02-02 19:00:00|    14.0|
| Carl|2020-02-01 16:00:00|    14.0|
| Carl|2020-02-02 17:00:00|     1.0|
| Eric|2020-02-01 16:00:00|    24.0|
+-----+-------------------+--------+

【讨论】:

TIL date_trunc,会很有用的 感谢 1 天的好解决方案。如果我有多个日期并且想像熊猫一样在 2 小时、3 小时进行聚合怎么办 @werner 另外,请解释一下 date_trunc 是如何工作的,我可以从日期中看到一些“小时”提取 n @ManuSharma 我认为 2h 或 3h 不适用于 date_truncdate_trunc 只是时间戳的某种“舍入”功能。对于 1 小时(如您最初的要求),它会起作用,但此解决方案仅提供有限的灵活性 @anky 问题扩大了一点,也许你可以取消删除你的答案?【参考方案2】:

一种可能的方法是使用 2 个窗口 1 来确定 inddate 分区上的时间差是否在 1 小时内,其次使用上述窗口和 time_diff 计算得到平均值(注意:Anand (12+7)/2 = 9.5 与预期输出中的 9 相对比):

one_hrs= 1*60*60
w = Window.partitionBy("ind",F.to_date("date"))
w1 = Window.partitionBy("ind",F.to_date("date"),"time_diff")

(df.withColumn("date",F.to_timestamp("date"))
   .withColumn("first_date",F.first("date").over(w))
   .withColumn("time_diff",((F.unix_timestamp("date")-F.unix_timestamp("first_date"))
    <=one_hrs).cast("Integer"))
 .withColumn("sal",F.mean("sal").over(w1)).dropDuplicates(["ind","sal","time_diff"])
 .drop("first_date","time_diff").orderBy("ind")).show()

+-----+-------------------+----+---+
|  ind|               date| sal|imp|
+-----+-------------------+----+---+
|Anand|2020-02-02 19:10:00|14.0| sa|
|Anand|2020-02-01 16:00:00| 9.5| ba|
| Carl|2020-02-01 16:00:00|14.0| da|
| Carl|2020-02-02 17:10:00| 1.0| ga|
| Eric|2020-02-01 16:00:00|24.0| sa|
+-----+-------------------+----+---+

【讨论】:

【参考方案3】:

看到您将日期作为字符串,一种粗略的方法是拆分和聚合。

import pyspark.sql.functions as F
a = sqlContext.createDataFrame([["Anand", "2020-02-01 16:00:00", 12, "ba"],
                         ["Anand", "2020-02-01 16:05:00", 7, "ba"],
                        ["Anand", "2020-02-02 19:10:00", 14,"sa"], 
                        ["Carl", "2020-02-01 16:00:00", 16,"da"], 
                        ["Carl", "2020-02-02 16:02:00", 12,"ga"],
                        ["Carl", "2020-02-02 17:10:00", 1,"ga"],
                        ["Eric", "2020-02-01 16:o0:00", 24, "sa"]], ['ind',"date","sal","imp"])
a_spli = a.withColumn("hour",F.split(F.col('date'),':')[0])

test_res = a_spli.groupby('ind','hour').agg(F.mean('sal'))

sparkts 是一个处理时间相关任务的酷库:https://github.com/sryza/spark-timeseries。看看吧。

test_res.show()
+-----+-------------+--------+
|  ind|         hour|avg(sal)|
+-----+-------------+--------+
|Anand|2020-02-01 16|     9.5|
|Anand|2020-02-02 19|    14.0|
| Carl|2020-02-01 16|    16.0|
| Carl|2020-02-02 16|    12.0|
| Carl|2020-02-02 17|     1.0|
| Eric|2020-02-01 16|    24.0|
+-----+-------------+--------+

【讨论】:

以上是关于pyspark:在日期和时间上重新采样 pyspark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)

上采样日期时间 - ValueError:无法使用方法或限制重新索引非唯一索引

在 PySpark 中重新索引和填充缺失的日期

根据不同日期重新采样时间序列

使用 Pandas 将每日数据重新采样为每月(日期格式)

在 pandas 中有效地聚合重新采样的日期时间集合