RxSwift observable 创建,它采用流本身的最后一个值
Posted
技术标签:
【中文标题】RxSwift observable 创建,它采用流本身的最后一个值【英文标题】:RxSwift observable creation which takes the last value of the stream itself 【发布时间】:2021-01-03 06:09:58 【问题描述】:我正在尝试使用 RxSwift 计算 SMA(简单移动平均线)和 EMA(指数加权移动平均线)
设置如下,方法以收盘价流作为输入Observable<Double>
。所以每次如果有一个新的close
价格被发出,sma obervable 就会向流发出一个新的计算值
我完成了 SMA 版本,它工作正常
func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?>
let bag = DisposeBag()
return Observable<Double?>.create observer -> Disposable in
source.scan([]) Array($0 + [$1].suffix(length)) .subscribe(onNext: value in
if value.count < length
observer.onNext(nil)
else
observer.onNext(value.reduce(0.0, $0 + $1 / Double(length) ))
).disposed(by: bag)
return Disposables.create()
但是EMA公式有点复杂
https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp
公式涉及之前的 EMA 值。
我不清楚如何在 Observable 创建块中获取流的最后一个值:thinking
下面是我尝试实现的代码,但是.withLatestFrom(ema(source, length))
没有成功
func ema(_ source: Observable<Double>, _ length: Int) -> Observable<Double?>
let bag = DisposeBag()
return Observable<Double?>.create observer -> Disposable in
source.scan([]) Array($0 + [$1].suffix(length)) .withLatestFrom(ema(source, length)) return ($0, $1)
.subscribe(onNext: value in
let alpha: Double = Double(2) / Double(length + 1)
let src = value.0
var sum: Double? = 0.0
let sum1 = value.1
sum = na(sum1) ? sma(src, length) : alpha * src.last! + (1 - alpha) * nz(sum1)
observer.onNext(sum)
).disposed(by: bag)
return Disposables.create()
非常感谢任何帮助:祈祷
【问题讨论】:
【参考方案1】:首先让我们清理您的 sma 运算符。您在不合适的功能内创建处理袋。订阅返回一个一次性,创建的闭包需要返回一个一次性。只需返回订阅者的一次性...
func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?>
Observable<Double?>.create observer -> Disposable in
source
.scan([]) Array($0 + [$1].suffix(length))
.subscribe(onNext: value in
if value.count < length
observer.onNext(nil)
else
observer.onNext(value.reduce(0.0, $0 + $1 / Double(length) ))
)
但由于您输出的事件数与输入的事件数相同,因此我们可以进一步简化。每当您输出与输入相同数量的事件时,请考虑map
。
func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?>
source
.scan([]) Array($0 + [$1].suffix(length))
.map value in
if value.count < length
return nil
else
return value.reduce(0.0, $0 + $1 / Double(length) )
当您只有一个 Observable 作为输入时,请考虑将其作为 Observable 类型的扩展,以便可以轻松地将其插入到链中...
extension ObservableType where Element == Double
func sma(_ length: Int) -> Observable<Double?>
scan([]) Array($0 + [$1].suffix(length))
.map $0.count < length ? nil : $0.reduce(0.0, $0 + $1 / Double(length) )
既然我们已经完成了所有这些,让我们来解决您的实际问题。首先将您的公式表达为函数...
func getEMA(prices: [Double], k: Double? = nil) -> Double
guard !prices.isEmpty else return 0
let k = k ?? Double(2 / (prices.count + 1))
return prices[0] * k + getEMA(prices: prices.suffix(prices.count - 1), k: k) * (1 - k)
使用一些样本值应该很容易测试上述内容。我会让你去做。完成上述操作后,我们可以使用与 sma
相同的模式来创建运算符:
extension ObservableType where Element == Double
func ema(_ length: Int) -> Observable<Double?>
scan([]) Array([$1] + $0).suffix(length) // put the most recent price in front to correctly handle the formula
.map $0.count < length ? nil : getEMA(prices: $0)
【讨论】:
我最初对getEMA
公式的表达是错误的。我用正确的表达方式更新了答案。
非常感谢@Daniel T。SMA 的修订版看起来更干净。关于 EMA 的问题是,它需要 EMA-1 才能计算下一个 EMA,这就像一个递归问题。我找不到 rx 运算符来在流创建阶段直接检索当前流的最后一个值。所以我想出了这个,***.com/a/65549401/1005570
我贴的代码是递归的,解决了问题。【参考方案2】:
我找到了一个解决方案(不确定它是否最干净)
这是在函数声明内部同时在 Observable.create
块外部创建一个 BehaviourRelay
,该块保留计算的最新 EMA 的本地副本。
这样,它不需要函数的消费者依赖注入Observable
,也不需要复杂的流转换。
以下是使用 RxSwift 实现的 EMA(指数加权移动平均线)
func ema(_ source: Observable<Double>, _ length: Int) -> Observable<Double?>
let bag = DisposeBag()
let lastEMA: BehaviorRelay<Double?> = BehaviorRelay.init(value: nil)
return Observable<Double?>.create observer -> Disposable in
source.scan([]) Array($0 + [$1].suffix(length))
.subscribe(onNext: value in
let alpha: Double = Double(2) / Double(length + 1)
let src = value
var sum: Double? = 0.0
sum = na(lastEMA.value) ? sma(src, length) : alpha * src.last! + (1 - alpha) * nz(lastEMA.value)
observer.onNext(sum)
lastEMA.accept(sum)
).disposed(by: bag)
return Disposables.create()
备注:
na
和 nz
是从 TradingView
pinescript 复制的方法
https://www.tradingview.com/pine-script-reference/#var_na
https://www.tradingview.com/pine-script-reference/#fun_nz
https://www.tradingview.com/pine-script-reference/#fun_ema
【讨论】:
以上是关于RxSwift observable 创建,它采用流本身的最后一个值的主要内容,如果未能解决你的问题,请参考以下文章