RxSwift observable 创建,它采用流本身的最后一个值

Posted

技术标签:

【中文标题】RxSwift observable 创建,它采用流本身的最后一个值【英文标题】:RxSwift observable creation which takes the last value of the stream itself 【发布时间】:2021-01-03 06:09:58 【问题描述】:

我正在尝试使用 RxSwift 计算 SMA(简单移动平均线)和 EMA(指数加权移动平均线)

设置如下,方法以收盘价流作为输入Observable<Double>。所以每次如果有一个新的close 价格被发出,sma obervable 就会向流发出一个新的计算值

我完成了 SMA 版本,它工作正常

func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> 
    let bag = DisposeBag()

    return Observable<Double?>.create  observer -> Disposable in
        source.scan([])  Array($0 + [$1].suffix(length)) .subscribe(onNext:  value in
                if value.count < length 
                    observer.onNext(nil)
                 else 
                    observer.onNext(value.reduce(0.0,  $0 + $1 / Double(length) ))
                
        ).disposed(by: bag)

        return Disposables.create()
    
 

但是EMA公式有点复杂

https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp

公式涉及之前的 EMA 值。

我不清楚如何在 Observable 创建块中获取流的最后一个值:thinking

下面是我尝试实现的代码,但是.withLatestFrom(ema(source, length)) 没有成功

func ema(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> 
    let bag = DisposeBag()
    return Observable<Double?>.create  observer -> Disposable in
        source.scan([])  Array($0 + [$1].suffix(length)) .withLatestFrom(ema(source, length))  return ($0, $1) 
            .subscribe(onNext:  value in
                let alpha: Double = Double(2) / Double(length + 1)
                let src = value.0
                var sum: Double? = 0.0
                let sum1 = value.1
                sum = na(sum1) ? sma(src, length) : alpha * src.last! + (1 - alpha) * nz(sum1)
            
                observer.onNext(sum)
        ).disposed(by: bag)

        return Disposables.create()
    

非常感谢任何帮助:祈祷

【问题讨论】:

【参考方案1】:

首先让我们清理您的 sma 运算符。您在不合适的功能内创建处理袋。订阅返回一个一次性,创建的闭包需要返回一个一次性。只需返回订阅者的一次性...

func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> 
    Observable<Double?>.create  observer -> Disposable in
        source
            .scan([])  Array($0 + [$1].suffix(length)) 
            .subscribe(onNext:  value in
                if value.count < length 
                    observer.onNext(nil)
                 else 
                    observer.onNext(value.reduce(0.0,  $0 + $1 / Double(length) ))
                
            )
    

但由于您输出的事件数与输入的事件数相同,因此我们可以进一步简化。每当您输出与输入相同数量的事件时,请考虑map

func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> 
    source
        .scan([])  Array($0 + [$1].suffix(length)) 
        .map  value in
            if value.count < length 
                return nil
             else 
                return value.reduce(0.0,  $0 + $1 / Double(length) )
            
        

当您只有一个 Observable 作为输入时,请考虑将其作为 Observable 类型的扩展,以便可以轻松地将其插入到链中...

extension ObservableType where Element == Double 
    func sma(_ length: Int) -> Observable<Double?> 
        scan([])  Array($0 + [$1].suffix(length)) 
            .map  $0.count < length ? nil : $0.reduce(0.0,  $0 + $1 / Double(length) ) 
    


既然我们已经完成了所有这些,让我们来解决您的实际问题。首先将您的公式表达为函数...

func getEMA(prices: [Double], k: Double? = nil) -> Double 
    guard !prices.isEmpty else  return 0 
    let k = k ?? Double(2 / (prices.count + 1))
    return prices[0] * k + getEMA(prices: prices.suffix(prices.count - 1), k: k) * (1 - k)

使用一些样本值应该很容易测试上述内容。我会让你去做。完成上述操作后,我们可以使用与 sma 相同的模式来创建运算符:

extension ObservableType where Element == Double 
    func ema(_ length: Int) -> Observable<Double?> 
        scan([])  Array([$1] + $0).suffix(length)  // put the most recent price in front to correctly handle the formula
            .map  $0.count < length ? nil : getEMA(prices: $0) 
    

【讨论】:

我最初对getEMA 公式的表达是错误的。我用正确的表达方式更新了答案。 非常感谢@Daniel T。SMA 的修订版看起来更干净。关于 EMA 的问题是,它需要 EMA-1 才能计算下一个 EMA,这就像一个递归问题。我找不到 rx 运算符来在流创建阶段直接检索当前流的最后一个值。所以我想出了这个,***.com/a/65549401/1005570 我贴的代码是递归的,解决了问题。【参考方案2】:

我找到了一个解决方案(不确定它是否最干净)

这是在函数声明内部同时在 Observable.create 块外部创建一个 BehaviourRelay,该块保留计算的最新 EMA 的本地副本。

这样,它不需要函数的消费者依赖注入Observable,也不需要复杂的流转换。

以下是使用 RxSwift 实现的 EMA(指数加权移动平均线)

func ema(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> 
    let bag = DisposeBag()
    let lastEMA: BehaviorRelay<Double?> = BehaviorRelay.init(value: nil)
    return Observable<Double?>.create  observer -> Disposable in
        source.scan([])  Array($0 + [$1].suffix(length)) 
            .subscribe(onNext:  value in
                let alpha: Double = Double(2) / Double(length + 1)
                let src = value
                var sum: Double? = 0.0
                sum = na(lastEMA.value) ? sma(src, length) : alpha * src.last! + (1 - alpha) * nz(lastEMA.value)
                observer.onNext(sum)
                lastEMA.accept(sum)
        ).disposed(by: bag)
        return Disposables.create()
    

备注:

nanz 是从 TradingView pinescript 复制的方法

https://www.tradingview.com/pine-script-reference/#var_na

https://www.tradingview.com/pine-script-reference/#fun_nz

https://www.tradingview.com/pine-script-reference/#fun_ema

【讨论】:

以上是关于RxSwift observable 创建,它采用流本身的最后一个值的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift:如何让一个 observable 触发另一个?

RxSwift 对一个 observable 的多个订阅

RxSwift 使 Observable 触发一个主题

RxSwift 系列

RxSwift之深入解析如何创建观察者Observer

IOS RxSwift 从 Observable 数组创建 Observable 数组