Apache Spark:指数移动平均线
Posted
技术标签:
【中文标题】Apache Spark:指数移动平均线【英文标题】:Apache Spark: Exponential Moving Average 【发布时间】:2015-11-27 14:15:53 【问题描述】:我正在使用 Spark/Scala 编写一个应用程序,我需要在其中计算一列的指数移动平均值。
EMA_t = (price_t * 0.4) + (EMA_t-1 * 0.6)
我面临的问题是我需要同一列的先前计算的值(EMA_t-1)。通过 mysql,这可以通过使用 MODEL 或通过创建一个 EMA 列来实现,然后您可以更新每行的行,但我已经尝试过了,并且既不能使用 Spark SQL 也不能使用 Hive 上下文......有什么办法我可以访问这个 EMA_t-1?
我的数据如下所示:
timestamp price
15:31 132.3
15:32 132.48
15:33 132.76
15:34 132.66
15:35 132.71
15:36 132.52
15:37 132.63
15:38 132.575
15:39 132.57
所以我需要添加一个新列,其中我的第一个值只是第一行的价格,然后我需要使用以前的值:EMA_t = (price_t * 0.4) + (EMA_t-1 * 0.6)计算该列中的以下行。 我的 EMA 列必须是:
EMA
132.3
132.372
132.5272
132.58032
132.632192
132.5873152
132.6043891
132.5926335
132.5835801
我目前正在尝试使用 Spark SQL 和 Hive 来实现它,但如果可以以其他方式实现它,这将同样受欢迎!我也想知道如何使用 Spark Streaming 做到这一点。我的数据在数据框中,我使用的是 Spark 1.4.1。
非常感谢您提供的任何帮助!
【问题讨论】:
我认为您的用例不太适合大数据环境,因为它的功能是并行处理数据,而您的用例不允许这样做...您有多少记录有吗? @mark91 每个数据集大约有 100 000 行,需要分析大约 100 个数据集。我需要计算这个的原因是我需要这个作为输入来计算一个特征。我需要使用这些功能来训练带有随机森林的模型。我的模型必须根据几个特征的值来预测价格是上涨还是下跌。我还需要在未来实现这一点。 那么在我看来,最好的选择是分配数据,例如每个数据集都在一个分区中(每个分区都包含一个数据集),然后您独立处理每个分区.. 【参考方案1】:回答你的问题:
我面临的问题是我需要同一列的先前计算的值(EMA_t-1)
我认为你需要两个函数:Window 和 Lag。 (为方便计算 EMA 时,我还将 null 值设为零)
my_window = Window.orderBy("timestamp")
df.withColumn("price_lag_1",when(lag(col("price"),1).over(my_window).isNull,lit(0)).otherwise(lag(col("price"),1).over(my_window)))
我也是 Spark Scala 的新手,我想看看我是否可以定义一个 UDF 来做指数平均。但是现在一个明显的走动将是手动添加所有滞后列( 0.4 * lag0 + 0.4*0.6*lag1 + 0.4 * 0.6^2*lag2 ...)这样的事情
df.withColumn("ema_price",
price * lit(0.4) * Math.pow(0.6,0) +
lag(col("price"),1).over(my_window) * 0.4 * Math.pow(0.6,1) +
lag(col("price"),2).over(my_window) * 0.4 * Math.pow(0.6,2) + .... )
我忽略了 when.otherwise 以使其更清楚。现在这个方法对我有用..
----更新----
def emaFunc (y: org.apache.spark.sql.Column, group: org.apache.spark.sql.Column, order: org.apache.spark.sql.Column, beta: Double, lookBack: Int) : org.apache.spark.sql.Column =
val ema_window = Window.partitionBy(group).orderBy(order)
var i = 1
var result = y
while (i < lookBack)
result = result + lit(1) * ( when(lag(y,i).over(ema_window).isNull,lit(0)).otherwise(lag(y,i).over(ema_window)) * beta * Math.pow((1-beta),i)
- when(lag(y,i).over(ema_window).isNull,lit(0)).otherwise(y * beta * Math.pow((1-beta),i)) )
i = i + 1
return result
通过使用这个函数,你应该能够得到类似价格的 EMA..
df.withColumn("one",lit(1))
.withColumn("ema_price", emaFunc('price,'one,'timestamp,0.1,10)
这将回顾 10 天并计算 beta=0.1 的估计 EMA。 “一”列只是一个占位符,因为您没有分组列。
【讨论】:
能否告诉我如何计算 beta 为 0.1 的计算方法。我正在尝试上面的代码并找到 EMA 10 的值。我没有得到正确的值,所以我想我需要更改 beta 值,所以你能帮我计算 beta 0.1 吗? Beta 就像一个衰减因子。它是参数。您设置测试版取决于您对今天的关注程度与您对历史的关注程度。 @M 如果我不使用这个 beta 参数怎么办?因为我没有得到我想要的? @MaheshGupta 我猜这个公式需要你设置 beta(en.wikipedia.org/wiki/Moving_average#Exponential_moving_average),或者你可以多谈谈你到底想要什么?【参考方案2】:您应该可以使用 1.4 中引入的 Spark 窗口函数来执行此操作:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
w = Window().partitionBy().orderBy(col("timestamp")) df.select("*", lag("price").over(w).alias("ema"))
这将为您选择最后的价格,以便您对其进行计算
【讨论】:
以上是关于Apache Spark:指数移动平均线的主要内容,如果未能解决你的问题,请参考以下文章