pyspark:使用自定义时间序列数据的滚动平均值
Posted
技术标签:
【中文标题】pyspark:使用自定义时间序列数据的滚动平均值【英文标题】:pyspark: rolling average using Custom timeseries data 【发布时间】:2020-01-21 05:32:37 【问题描述】:嗨,我的基础数据框是这样的。
'|stockId|timeStamp|stockPrice|'
+-------+---------+----------+
| 101| 1| 53.0|
| 101| 2| 15.0|
| 101| 3| 57.0|
| 101| 4| 71.0|
| 101| 5| 86.0|
这是我的代码,它转换 days.followed by window 和 average window 。
days=lambda i:i*86400
W=Window.partitionBy(F.col('stockId')).orderBy(F.col('epoch_time').cast("timestamp").cast("long")).rangeBetween(-days(3),0)
Df=.withColumn("current_timestamp",F.unix_timestamp(F.lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))\
.withColumn("epoch",F.unix_timestamp("current_timestamp"))\
.withColumn("epoch_time",F.concat(F.col("epoch")+F.col("timeStamp")))\
.withColumn("moving_avg",F.avg("stockPrice").over(W))
这是我的结果。
+-------+---------+----------+-------------------+----------+----------+-----------------+
|stockId|timeStamp|stockPrice| current_timestamp| epoch|epoch_time| moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
| 101| 1| 53.0|2020-01-21 10:53:43|1579584223|1579584224|48.21782178217822|
| 101| 2| 15.0|2020-01-21 10:53:43|1579584223|1579584225|48.21782178217822|
| 101| 3| 57.0|2020-01-21 10:53:43|1579584223|1579584226|48.21782178217822|
| 101| 4| 86.0|2020-01-21 10:53:43|1579584223|1579584227|48.21782178217822|
预期输出
+-------+---------+----------+-------------------+----------+----------+-----------------+
|stockId|timeStamp|stockPrice| current_timestamp| epoch|epoch_time| moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
| 101| 3| 57.0|2020-01-21 10:53:43|1579584223|1579584226|41.67|
| 101| 4| 71.0|2020-01-21 10:53:43|1579584223|1579584227|47.67|
| 101| 5| 71.0|2020-01-21 10:53:43|1579584223|1579584227|71.33|
【问题讨论】:
我的天数是多少? 它是一个 lambda 函数,可以转换为秒 24*60*60 谢谢你,你在什么基础上不需要前两行? 为了更好地理解我一直这样。但滚动窗口超过 3 天。 【参考方案1】:W=Window.partitionBy(F.col('stockId')).orderBy(F.col('epoch_time').cast("timestamp").cast("long")).rangeBetween(-2,0)
+-------+---------+----------+-------------------+----------+----------+------
|stockId|timeStamp|stockPrice| current_timestamp| epoch|epoch_time| moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
| 101| 3| 57.0|2020-01-21 10:53:43|1579584223|1579584226|41.67|
| 101| 4| 71.0|2020-01-21 10:53:43|1579584223|1579584227|47.67|
| 101| 5| 71.0|2020-01-21 10:53:43|1579584223|1579584227|71.33|
保持滚动后window rangeBetween(-2,0)
平均值从current row
到3rd row of the table
。
【讨论】:
以上是关于pyspark:使用自定义时间序列数据的滚动平均值的主要内容,如果未能解决你的问题,请参考以下文章
pandas使用rolling函数计算dataframe指定数据列特定窗口下的滚动均值(rolling mean)自定义指定滚动窗口的大小(window size)