pyspark:在日期和时间上重新采样 pyspark 数据帧
Posted
技术标签:
【中文标题】pyspark:在日期和时间上重新采样 pyspark 数据帧【英文标题】:pyspark: resampling pyspark dataframe on date and time 【发布时间】:2020-06-28 13:27:27 【问题描述】:如何重新采样 pyspark 数据帧,就像在 pandas 中我们有 pd.grouper 和 pd.resample 一样,我可以在 h、2h、3h、week 上重新采样。我有以下示例 pyspark 数据框,我如何在列 ind 和 date 以及 every h/2h/3h
上聚合它>from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
a = sqlContext.createDataFrame([["Anand", "2020-02-01 16:00:00", 12, "ba"],
["Anand", "2020-02-01 16:05:00", 7, "ba" ]
["Anand", "2020-02-02 19:10:00", 14,"sa"],
["Carl", "2020-02-01 16:00:00", 16,"da"],
["Carl", "2020-02-02 16:02:00", 12,"ga"],
["Carl", "2020-02-02 17:10:00", 1,"ga"],
["Eric", "2020-02-01 16:o0:00", 24, "sa"]], ['ind',"date","sal","imp"])
a.show()
| ind| date|sal|imp|
+-----+-------------------+---+---+
|Anand|2020-02-01 16:00:00| 12| ba|
|Anand|2020-02-01 16:05:00| 7| sa|
|Anand|2020-02-02 19:10:00| 14| sa|
| Carl|2020-02-01 16:00:00| 16| da|
| Carl|2020-02-01 16:02:00| 12| ga|
| Carl|2020-02-02 17:10:00| 1| ga|
| Eric|2020-02-01 16:00:00| 24| sa|
因此,当聚合列 ind 并在 date(everyhour) 和 mean 的 sale 上重新采样时,期望输出可能看起来像
| ind| date|sal|
+-----+-------------------+---+
|Anand|2020-02-01 16:00:00| 9|
|Anand|2020-02-02 19:00:00| 14|
| Carl|2020-02-01 16:00:00| 9|
| Carl|2020-02-02 17:00:00| 1|
| Eric|2020-02-01 16:00:00| 24|
【问题讨论】:
【参考方案1】:您可以完全按照您在问题中描述的方式进行操作:按ind
和date
分组。在date_trunc 的帮助下,我们可以在分组前将日期列四舍五入:
from pyspark.sql import functions as F
a.groupBy('ind', F.date_trunc('hour', F.col('date')).alias('date'))\
.agg(F.mean('sal')) \
.orderBy('ind', 'date') \
.show()
打印
+-----+-------------------+--------+
| ind| date|avg(sal)|
+-----+-------------------+--------+
|Anand|2020-02-01 16:00:00| 9.5|
|Anand|2020-02-02 19:00:00| 14.0|
| Carl|2020-02-01 16:00:00| 14.0|
| Carl|2020-02-02 17:00:00| 1.0|
| Eric|2020-02-01 16:00:00| 24.0|
+-----+-------------------+--------+
【讨论】:
TILdate_trunc
,会很有用的
感谢 1 天的好解决方案。如果我有多个日期并且想像熊猫一样在 2 小时、3 小时进行聚合怎么办
@werner 另外,请解释一下 date_trunc 是如何工作的,我可以从日期中看到一些“小时”提取 n
@ManuSharma 我认为 2h 或 3h 不适用于 date_trunc
。 date_trunc
只是时间戳的某种“舍入”功能。对于 1 小时(如您最初的要求),它会起作用,但此解决方案仅提供有限的灵活性
@anky 问题扩大了一点,也许你可以取消删除你的答案?【参考方案2】:
一种可能的方法是使用 2 个窗口 1 来确定 ind
和 date
分区上的时间差是否在 1 小时内,其次使用上述窗口和 time_diff
计算得到平均值(注意:Anand
(12+7)/2 = 9.5 与预期输出中的 9 相对比):
one_hrs= 1*60*60
w = Window.partitionBy("ind",F.to_date("date"))
w1 = Window.partitionBy("ind",F.to_date("date"),"time_diff")
(df.withColumn("date",F.to_timestamp("date"))
.withColumn("first_date",F.first("date").over(w))
.withColumn("time_diff",((F.unix_timestamp("date")-F.unix_timestamp("first_date"))
<=one_hrs).cast("Integer"))
.withColumn("sal",F.mean("sal").over(w1)).dropDuplicates(["ind","sal","time_diff"])
.drop("first_date","time_diff").orderBy("ind")).show()
+-----+-------------------+----+---+
| ind| date| sal|imp|
+-----+-------------------+----+---+
|Anand|2020-02-02 19:10:00|14.0| sa|
|Anand|2020-02-01 16:00:00| 9.5| ba|
| Carl|2020-02-01 16:00:00|14.0| da|
| Carl|2020-02-02 17:10:00| 1.0| ga|
| Eric|2020-02-01 16:00:00|24.0| sa|
+-----+-------------------+----+---+
【讨论】:
【参考方案3】:看到您将日期作为字符串,一种粗略的方法是拆分和聚合。
import pyspark.sql.functions as F
a = sqlContext.createDataFrame([["Anand", "2020-02-01 16:00:00", 12, "ba"],
["Anand", "2020-02-01 16:05:00", 7, "ba"],
["Anand", "2020-02-02 19:10:00", 14,"sa"],
["Carl", "2020-02-01 16:00:00", 16,"da"],
["Carl", "2020-02-02 16:02:00", 12,"ga"],
["Carl", "2020-02-02 17:10:00", 1,"ga"],
["Eric", "2020-02-01 16:o0:00", 24, "sa"]], ['ind',"date","sal","imp"])
a_spli = a.withColumn("hour",F.split(F.col('date'),':')[0])
test_res = a_spli.groupby('ind','hour').agg(F.mean('sal'))
sparkts 是一个处理时间相关任务的酷库:https://github.com/sryza/spark-timeseries。看看吧。
test_res.show()
+-----+-------------+--------+
| ind| hour|avg(sal)|
+-----+-------------+--------+
|Anand|2020-02-01 16| 9.5|
|Anand|2020-02-02 19| 14.0|
| Carl|2020-02-01 16| 16.0|
| Carl|2020-02-02 16| 12.0|
| Carl|2020-02-02 17| 1.0|
| Eric|2020-02-01 16| 24.0|
+-----+-------------+--------+
【讨论】:
以上是关于pyspark:在日期和时间上重新采样 pyspark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章
如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)