RxSwift - 如何限制缓冲区的时间跨度

Posted

技术标签:

【中文标题】RxSwift - 如何限制缓冲区的时间跨度【英文标题】:RxSwift - How to throttle buffer's time span 【发布时间】:2017-04-07 19:38:50 【问题描述】:

我正在尝试重新创建一个代码 sn-p,它基本上计算一个按钮连续单击的次数。代码在 RxJS 中,我正在尝试将其转换为 RxSwift 以用于学习目的,但无法弄清楚缓冲区和节流部分。

You can see the js code on jsfiddle

目前我有这个

  tapButton.rx.tap      
  .buffer(timeSpan: 0.25, count: 10, scheduler: MainScheduler.instance)
  .map $0.count
  .filter  $0 >= 2 
  .subscribe(onNext:  events in
    print(events)
  ).addDisposableTo(disposeBag)

我不知道如何才能延迟到点击结束并收集自上次发射以来的所有值,就像在 RxJS 示例中一样。

【问题讨论】:

【参考方案1】:

您遇到的问题是因为 RxSwift buffer 运算符不像 RxJS buffer 运算符那样工作。它的工作方式更像 RxJS bufferWithTimeOrCount 运算符。

目前,从 3.4.0 版开始,没有与 buffer 运算符等效的功能。它的签名类似于func buffer(_ boundary: Observer<BoundaryType>) -> Observable<[E]>

这是一个有趣的问题。我最终制作了一个缓冲区运算符,我在这个答案的底部提供了它。下面是我将如何写出在 Andre 的代码中定义的解决方案:

    let trigger = button.rx.tap.debounce(0.25, scheduler: MainScheduler.instance)
    let clickStream = button.rx.tap.asObservable()
        .buffer(trigger)
        .map  $0.count 
        .map  $0 == 1 ? "click" : "\($0)x clicks" 

    let clearStream = clickStream
        .debounce(10.0, scheduler: MainScheduler.instance)
        .map  _ in "" 

    Observable.merge([clickStream, clearStream])
        .bind(to: label.rx.text)
        .disposed(by: bag)

上面的代码应该放在视图控制器的viewDidLoad方法中。我做了一个大改变和一个小改变。小的变化是我使用去抖动而不是油门。同样,我认为 RxJS 的油门与 RxSwift 的油门不同。最大的变化是我结合了他的 multiClickStream 和 singleClickStream。我不完全确定他为什么制作两个单独的流......

我所做的另一项更改是将所有影响标签的可观察对象滚动到标签可以绑定到的可观察对象中,而不是使用不同的可观察对象。我认为这更清洁。

下面是我定义的缓冲区操作符。

extension Observable 

    /// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
    func buffer<U>(_ boundary: Observable<U>) -> Observable<[E]> 
        return Observable<[E]>.create  observer in
            var buffer: [E] = []
            let lock = NSRecursiveLock()
            let boundaryDisposable = boundary.subscribe  event in
                lock.lock(); defer  lock.unlock() 
                switch event 
                case .next:
                    observer.onNext(buffer)
                    buffer = []
                default:
                    break
                
            
            let disposable = self.subscribe  event in
                lock.lock(); defer  lock.unlock() 
                switch event 
                case .next(let element):
                    buffer.append(element)
                case .completed:
                    observer.onNext(buffer)
                    observer.onCompleted()
                case .error(let error):
                    observer.onError(error)
                    buffer = []
                
            
            return Disposables.create([disposable, boundaryDisposable])
        
    

【讨论】:

感谢您的回答。如果有人提出解决方案或者我自己想出办法,我会让这个问题更悬而未决。 你可能会觉得这个讨论很有趣:github.com/ReactiveX/RxSwift/issues/590 非常感谢您的解决方案和解释。 ?

以上是关于RxSwift - 如何限制缓冲区的时间跨度的主要内容,如果未能解决你的问题,请参考以下文章

Golang - 如何克服 bufio 的 Scan() 缓冲区限制?

如何限制管道(windows)的缓冲区大小?

使用 Dataset.groupByKey 时如何绕过 2GB 缓冲区限制?

我如何限制发送到Stomp队列(处理websocket)的数据量,以便可以保证不会溢出缓冲区?

达到输出大小限制后,缓冲数据被截断

对缓冲区大小施加限制