Swift Combine:缓冲上游值并以稳定的速率发出它们?

Posted

技术标签:

【中文标题】Swift Combine:缓冲上游值并以稳定的速率发出它们?【英文标题】:Swift Combine: Buffer upstream values and emit them at a steady rate? 【发布时间】:2020-04-09 12:58:58 【问题描述】:

ios 13 中使用新的 Combine 框架。

假设我有一个上游发布者以非常不规则的速率发送值 - 有时几秒钟或几分钟可能没有任何值,然后一串值可能会同时通过。我想创建一个自定义发布者,它订阅上游值,缓冲它们并在它们进入时以常规的已知节奏发出它们,但如果它们都用尽了,则什么也不发布。

举个具体的例子:

t = 0 到 5000 毫秒:未发布上游值 t = 5001ms:上游发布“a” t = 5002ms:上游发布“b” t = 5003ms:上游发布“c” t = 5004 毫秒到 10000 毫秒:未发布上游值 t = 10001ms:上游发布“d”

我订阅上游的发布者将每 1 秒产生一次值:

t = 0 到 5000 毫秒:未发布任何值 t = 5001ms:发布“a” t = 6001ms:发布“b” t = 7001ms:发布“c” t = 7001ms 到 10001ms:没有公布值 t = 10001ms:发布“d”

Combine 中现有的出版商或运营商似乎都相当做我想做的事。

throttledebounce 将简单地以特定节奏对上游值进行采样并删除丢失的值(例如,如果节奏为 1000 毫秒,则只会发布“a”) delay 会为每个值添加相同的延迟,但不会将它们分开(例如,如果我的延迟是 1000 毫秒,它将在 6001 毫秒发布“a”,在 6002 毫秒发布“b”,在 6003 毫秒发布“c”) buffer 似乎很有希望,但我不太清楚如何使用它——如何强制它按需从缓冲区中发布一个值。当我将接收器连接到 buffer 时,它似乎只是立即发布了所有值,根本没有缓冲。

我考虑过使用某种组合运算符,如 zipmergecombineLatest 并将其与 Timer 发布者组合,这可能是正确的方法,但我无法弄清楚如何配置它给出我想要的行为。

编辑

这是一个大理石图,希望能说明我的目标:

Upstream Publisher:
-A-B-C-------------------D-E-F--------|>

My Custom Operator:
-A----B----C-------------D----E----F--|>

编辑 2:单元测试

如果modulatedPublisher(我想要的缓冲发布者)按预期工作,则这是一个应该通过的单元测试。它并不完美,但它会在收到事件(包括收到的时间)时存储它们,然后比较事件之间的时间间隔,确保它们不小于所需的间隔。

func testCustomPublisher() 
    let expectation = XCTestExpectation(description: "async")
    var events = [Event]()

    let passthroughSubject = PassthroughSubject<Int, Never>()
    let cancellable = passthroughSubject
        .modulatedPublisher(interval: 1.0)
        .sink  value in
            events.append(Event(value: value, date: Date()))
            print("value received: \(value) at \(self.dateFormatter.string(from:Date()))")
        

    // WHEN I send 3 events, wait 6 seconds, and send 3 more events
    passthroughSubject.send(1)
    passthroughSubject.send(2)
    passthroughSubject.send(3)

    DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(6000)) 
        passthroughSubject.send(4)
        passthroughSubject.send(5)
        passthroughSubject.send(6)

        DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(4000)) 

            // THEN I expect the stored events to be no closer together in time than the interval of 1.0s
            for i in 1 ..< events.count 
                let interval = events[i].date.timeIntervalSince(events[i-1].date)
                print("Interval: \(interval)")

                // There's some small error in the interval but it should be about 1 second since I'm using a 1s modulated publisher.
                XCTAssertTrue(interval > 0.99)
            
            expectation.fulfill()
        
    

    wait(for: [expectation], timeout: 15)

我得到的最接近的是使用zip,就像这样:

public extension Publisher where Self.Failure == Never 
    func modulatedPublisher(interval: TimeInterval) -> AnyPublisher<Output, Never> 
        let timerBuffer = Timer
        .publish(every: interval, on: .main, in: .common)
        .autoconnect()

      return timerBuffer
        .zip(self,  $1 )                  // should emit one input element ($1) every timer tick
        .eraseToAnyPublisher()
    

这正确地协调了前三个事件(1、2 和 3),而不是后三个事件(4、5 和 6)。输出:

value received: 1 at 3:54:07.0007
value received: 2 at 3:54:08.0008
value received: 3 at 3:54:09.0009
value received: 4 at 3:54:12.0012
value received: 5 at 3:54:12.0012
value received: 6 at 3:54:12.0012

我相信这是因为zip 有一些内部缓冲能力。前三个上游事件被缓冲并在 Timer 的节奏上发出,但是在 6 秒的等待期间,Timer 的事件被缓冲 - 当第二个设置上游事件被触发时,队列中已经有 Timer 事件等待,所以它们'配对并立即发射。

【问题讨论】:

如果你能分享一些你所期望的(失败的)单元测试,这对我们(我)和你自己来说都非常有用。当作为测试实施时,您想要实现的目标总是更加清晰。你可以从我的测试中得到一些启发:***.com/a/58734595/1311272 你也可以结合期望来编写测试:github.com/groue/CombineExpectations 谢谢 Sajjon - 我会尝试写一个。不过,我从来没有为发布者写过单元测试,所以可能需要一点时间。我很好奇,它对你(和我)是否有用,仅仅是因为你可以开始试验代码并知道你什么时候因为通过了测试而成功了?或者它是否有用,因为它阐明了确切的要求?我相信我在问题中给出的例子是相当清楚的。 多半是因为需求变得清晰了 我认为我的链接将是一个很好的起点 @Sajjon 我已经更新了我的帖子,其中包含一个失败的单元测试和一个几乎但不完全按照我想要做的实现。 【参考方案1】:

这是一个有趣的问题。我玩过Timer.publishbufferzipthrottle 的各种组合,但我无法让任何组合完全按照你想要的方式工作。所以让我们编写一个自定义订阅者。

我们真正想要的是一个 API,当我们从上游获取输入时,我们还能够控制上游何时传递下一个输入。像这样的:

extension Publisher 
    /// Subscribe to me with a stepping function.
    /// - parameter stepper: A function I'll call with each of my inputs, and with my completion.
    ///   Each time I call this function with an input, I also give it a promise function.
    ///   I won't deliver the next input until the promise is called with a `.more` argument.
    /// - returns: An object you can use to cancel the subscription asynchronously.
    func step(with stepper: @escaping (StepEvent<Output, Failure>) -> ()) -> AnyCancellable 
        ???
    


enum StepEvent<Input, Failure: Error> 
    /// Handle the Input. Call `StepPromise` when you're ready for the next Input,
    /// or to cancel the subscription.
    case input(Input, StepPromise)

    /// Upstream completed the subscription.
    case completion(Subscribers.Completion<Failure>)


/// The type of callback given to the stepper function to allow it to continue
/// or cancel the stream.
typealias StepPromise = (StepPromiseRequest) -> ()

enum StepPromiseRequest 
    // Pass this to the promise to request the next item from upstream.
    case more

    // Pass this to the promise to cancel the subscription.
    case cancel

有了这个step API,我们可以编写一个pace 操作符来做你想做的事:

extension Publisher 
    func pace<Context: Scheduler, MySubject: Subject>(
        _ pace: Context.SchedulerTimeType.Stride, scheduler: Context, subject: MySubject)
        -> AnyCancellable
        where MySubject.Output == Output, MySubject.Failure == Failure
    
        return step 
            switch $0 
            case .input(let input, let promise):
                // Send the input from upstream now.
                subject.send(input)

                // Wait for the pace interval to elapse before requesting the
                // next input from upstream.
                scheduler.schedule(after: scheduler.now.advanced(by: pace)) 
                    promise(.more)
                

            case .completion(let completion):
                subject.send(completion: completion)
            
        
    

这个pace 运算符采用pace(输出之间所需的间隔)、一个调度事件的调度程序和一个重新发布上游输入的subject。它通过subject 发送每个输入来处理每个输入,然后使用调度程序等待步调间隔,然后再从上游请求下一个输入。

现在我们只需要实现step 操作符。在这里,Combine 并没有给我们太多帮助。它确实有一个称为“背压”的功能,这意味着发布者无法向下游发送输入,直到下游通过向上游发送 Subscribers.Demand 来请求它。通常你会看到下游向上游发送.unlimited 需求,但我们不会这样做。相反,我们将利用背压。在 stepper 完成 promise 之前,我们不会向上游发送任何需求,然后我们只会发送 .max(1) 的需求,因此我们使上游与 stepper 同步运行。 (我们还必须发送.max(1) 的初始需求才能开始整个过程​​。)

好的,所以需要实现一个接受步进函数并符合Subscriber 的类型。查看Reactive Streams JVM Specification 是个好主意,因为Combine 是基于该规范。

让实现变得困难的是,有几件事可以异步调用我们的订阅者:

上游可以从任何线程调用订阅者(但需要序列化其调用)。 在我们为步进器提供了 Promise 函数后,步进器可以在任何线程上调用这些 Promise。 我们希望订阅可以取消,并且可以在任何线程上取消。 所有这些异步性意味着我们必须用锁来保护我们的内部状态。 我们必须小心不要在持有该锁时调用,以避免死锁。

我们还将通过为每个 Promise 分配一个唯一的 id 来保护订户免受涉及重复调用 Promise 或调用过时 Promise 的恶作剧。

这是我们的基本订阅者定义:

import Combine
import Foundation

public class SteppingSubscriber<Input, Failure: Error> 

    public init(stepper: @escaping Stepper) 
        l_state = .subscribing(stepper)
    

    public typealias Stepper = (Event) -> ()

    public enum Event 
        case input(Input, Promise)
        case completion(Completion)
    

    public typealias Promise = (Request) -> ()

    public enum Request 
        case more
        case cancel
    

    public typealias Completion = Subscribers.Completion<Failure>

    private let lock = NSLock()

    // The l_ prefix means it must only be accessed while holding the lock.
    private var l_state: State
    private var l_nextPromiseId: PromiseId = 1

    private typealias PromiseId = Int

    private var noPromiseId: PromiseId  0 

请注意,我将之前的辅助类型(StepEventStepPromiseStepPromiseRequest)移至 SteppingSubscriber 并缩短了它们的名称。

现在让我们考虑l_state 的神秘类型State。我们的订阅者可能处于哪些不同的状态?

我们可能正在等待从上游接收 Subscription 对象。 我们可能已经收到来自上游的 Subscription 并正在等待信号(来自上游的输入或完成,或者来自步进器的承诺的完成)。 我们可能会调用步进器,我们要小心,以防它在我们调用它时完成承诺。 我们可能已被取消或已收到来自上游的完成。

这是我们对State的定义:

extension SteppingSubscriber 
    private enum State 
        // Completed or cancelled.
        case dead

        // Waiting for Subscription from upstream.
        case subscribing(Stepper)

        // Waiting for a signal from upstream or for the latest promise to be completed.
        case subscribed(Subscribed)

        // Calling out to the stopper.
        case stepping(Stepping)

        var subscription: Subscription? 
            switch self 
            case .dead: return nil
            case .subscribing(_): return nil
            case .subscribed(let subscribed): return subscribed.subscription
            case .stepping(let stepping): return stepping.subscribed.subscription
            
        

        struct Subscribed 
            var stepper: Stepper
            var subscription: Subscription
            var validPromiseId: PromiseId
        

        struct Stepping 
            var subscribed: Subscribed

            // If the stepper completes the current promise synchronously with .more,
            // I set this to true.
            var shouldRequestMore: Bool
        
    

由于我们使用NSLock(为简单起见),让我们定义一个扩展来确保我们始终匹配锁定和解锁:

fileprivate extension NSLock 
    @inline(__always)
    func sync<Answer>(_ body: () -> Answer) -> Answer 
        lock()
        defer  unlock() 
        return body()
    

现在我们准备好处理一些事件了。最容易处理的事件是异步取消,这是Cancellable 协议的唯一要求。如果我们处于除.dead 之外的任何状态,我们希望成为.dead,如果有上游订阅,请取消它。

extension SteppingSubscriber: Cancellable 
    public func cancel() 
        let sub: Subscription? = lock.sync 
            defer  l_state = .dead 
            return l_state.subscription
        
        sub?.cancel()
    

请注意,当lock 被锁定时,我不想调用上游订阅的cancel 函数,因为lock 不是递归锁,我不想冒死锁的风险。对lock.sync 的所有使用都遵循将任何调用推迟到解锁之后的模式。

现在让我们实现Subscriber 协议要求。首先,让我们处理从上游接收Subscription。唯一应该发生的情况是当我们处于 .subscribing 状态时,但 .dead 也是可能的,在这种情况下我们只想取消上游订阅。

extension SteppingSubscriber: Subscriber 
    public func receive(subscription: Subscription) 
        let action: () -> () = lock.sync 
            guard case .subscribing(let stepper) = l_state else 
                return  subscription.cancel() 
            
            l_state = .subscribed(.init(stepper: stepper, subscription: subscription, validPromiseId: noPromiseId))
            return  subscription.request(.max(1)) 
        
        action()
    

请注意,在lock.sync 的这次使用中(以及以后的所有使用中),我返回了一个“动作”闭包,这样我就可以在锁被解锁后执行任意调用。

我们要解决的下一个Subscriber 协议要求是接收完成:

    public func receive(completion: Subscribers.Completion<Failure>) 
        let action: (() -> ())? = lock.sync 
            // The only state in which I have to handle this call is .subscribed:
            // - If I'm .dead, either upstream already completed (and shouldn't call this again),
            //   or I've been cancelled.
            // - If I'm .subscribing, upstream must send me a Subscription before sending me a completion.
            // - If I'm .stepping, upstream is currently signalling me and isn't allowed to signal
            //   me again concurrently.
            guard case .subscribed(let subscribed) = l_state else 
                return nil
            
            l_state = .dead
            return  [stepper = subscribed.stepper] in
                stepper(.completion(completion))
            
        
        action?()
    

对我们来说最复杂的Subscriber 协议要求是接收Input

我们必须创建一个承诺。 我们必须将承诺传递给步进器。 步进器可以在返回之前完成承诺。 stepper 返回后,我们要检查它是否完成了.more 的promise,如果是,则向上游返回适当的需求。

由于我们必须在这项工作的中间调用步进器,所以我们有一些难看的 lock.sync 调用嵌套。

    public func receive(_ input: Input) -> Subscribers.Demand 
        let action: (() -> Subscribers.Demand)? = lock.sync 
            // The only state in which I have to handle this call is .subscribed:
            // - If I'm .dead, either upstream completed and shouldn't call this,
            //   or I've been cancelled.
            // - If I'm .subscribing, upstream must send me a Subscription before sending me Input.
            // - If I'm .stepping, upstream is currently signalling me and isn't allowed to
            //   signal me again concurrently.
            guard case .subscribed(var subscribed) = l_state else 
                return nil
            

            let promiseId = l_nextPromiseId
            l_nextPromiseId += 1
            let promise: Promise =  request in
                self.completePromise(id: promiseId, request: request)
            
            subscribed.validPromiseId = promiseId
            l_state = .stepping(.init(subscribed: subscribed, shouldRequestMore: false))
            return  [stepper = subscribed.stepper] in
                stepper(.input(input, promise))

                let demand: Subscribers.Demand = self.lock.sync 
                    // The only possible states now are .stepping and .dead.
                    guard case .stepping(let stepping) = self.l_state else 
                        return .none
                    
                    self.l_state = .subscribed(stepping.subscribed)
                    return stepping.shouldRequestMore ? .max(1) : .none
                

                return demand
            
        

        return action?() ?? .none
    
 // end of extension SteppingSubscriber: Publisher

我们的订阅者需要处理的最后一件事是完成一个承诺。这很复杂有几个原因:

我们希望防止承诺被多次完成。 我们希望防止旧承诺的完成。 承诺完成后,我们可以处于任何状态。

因此:

extension SteppingSubscriber 
    private func completePromise(id: PromiseId, request: Request) 
        let action: (() -> ())? = lock.sync 
            switch l_state 
            case .dead, .subscribing(_): return nil
            case .subscribed(var subscribed) where subscribed.validPromiseId == id && request == .more:
                subscribed.validPromiseId = noPromiseId
                l_state = .subscribed(subscribed)
                return  [sub = subscribed.subscription] in
                    sub.request(.max(1))
                
            case .subscribed(let subscribed) where subscribed.validPromiseId == id && request == .cancel:
                l_state = .dead
                return  [sub = subscribed.subscription] in
                    sub.cancel()
                
            case .subscribed(_):
                // Multiple completion or stale promise.
                return nil
            case .stepping(var stepping) where stepping.subscribed.validPromiseId == id && request == .more:
                stepping.subscribed.validPromiseId = noPromiseId
                stepping.shouldRequestMore = true
                l_state = .stepping(stepping)
                return nil
            case .stepping(let stepping) where stepping.subscribed.validPromiseId == id && request == .cancel:
                l_state = .dead
                return  [sub = stepping.subscribed.subscription] in
                    sub.cancel()
                
            case .stepping(_):
                // Multiple completion or stale promise.
                return nil
            
        

        action?()
    

哇!

完成所有这些后,我们就可以编写真正的step 运算符了:

extension Publisher 
    func step(with stepper: @escaping (SteppingSubscriber<Output, Failure>.Event) -> ()) -> AnyCancellable 
        let subscriber = SteppingSubscriber<Output, Failure>(stepper: stepper)
        self.subscribe(subscriber)
        return .init(subscriber)
    

然后我们可以从上面尝试pace 运算符。由于我们在SteppingSubscriber 中不做任何缓冲,而且上游通常没有缓冲,我们将在上游和pace 运算符之间添加一个buffer

    var cans: [AnyCancellable] = []

    func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool 
        let erratic = Just("A").delay(for: 0.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher()
            .merge(with: Just("B").delay(for: 0.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("C").delay(for: 0.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("D").delay(for: 5.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("E").delay(for: 5.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("F").delay(for: 5.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .handleEvents(
                receiveOutput:  print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") ,
                receiveCompletion:  print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") 
        )
            .makeConnectable()

        let subject = PassthroughSubject<String, Never>()

        cans += [erratic
            .buffer(size: 1000, prefetch: .byRequest, whenFull: .dropOldest)
            .pace(.seconds(1), scheduler: DispatchQueue.main, subject: subject)]

        cans += [subject.sink(
            receiveCompletion:  print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") ,
            receiveValue:  print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \($0)") 
            )]

        let c = erratic.connect()
        cans += [AnyCancellable  c.cancel() ]

        return true
    

最后,这里是输出:

erratic: 223394.17115897 A
paced: 223394.171495405 A
erratic: 223394.408086369 B
erratic: 223394.739186984 C
paced: 223395.171615624 B
paced: 223396.27056174 C
erratic: 223399.536717127 D
paced: 223399.536782847 D
erratic: 223399.536834495 E
erratic: 223400.236808469 F
erratic: 223400.236886323 finished
paced: 223400.620542561 E
paced: 223401.703613078 F
paced: 223402.703828512 finished
时间戳以秒为单位。 不稳定的出版商的时间确实不稳定,有时甚至很接近。 即使发生不稳定事件的时间间隔不到一秒,起搏计时也始终相隔至少一秒。 当不稳定事件发生在前一个事件之后超过一秒时,起搏事件会在不稳定事件之后立即发送,不会再延迟。 有节奏的完成发生在最后一个有节奏的事件之后一秒,即使不稳定的完成发生在最后一个不稳定的事件之后。 buffer 不会发送完成,直到它在发送最后一个事件后收到另一个请求,并且该请求被起搏计时器延迟。

我已将 step 运算符的整个实现放在 this gist 中,以便于复制/粘贴。

【讨论】:

是的宝贝!事实上,正如我所说(在现在已删除的评论中),需要自定义订阅者来执行 OP 想要的操作。 哇。这很有趣,也很精彩,也帮助我理解了buffer 的目的,如果下游订阅者的需求低于.unlimited,它可以缓冲事件。我刚刚开始探索编写自定义发布者,但我没有走到这一步。太感谢了!如果你写博客,也许这可以/应该成为一篇博文! 我确实有一个问题。您提供的实现需要用户提供一个单独的主题来充当下游。因此,它不是像upstream.buffer(...).pace(...).sink(...) 这样的单个完整管道,而是变成upstream.buffer(...).pace(..., subject: subject)subject.sink(...) 这仅仅是您为了简单而做出的选择,还是您的实现忽略了让 pace 直接返回另一个发布者的可能性,以便可以构建单个管道? 为了简单起见,我做出了这个选择。如果我们不使用主题,除了实现Subscriber协议之外,我们还必须实现Publisher协议。 我正在尝试采用您的解决方案并对其进行修改以做到这一点,主要是因为最终结果看起来更 Swift-y(或 Combine-y?)。如果我遇到麻烦,介意我请你帮忙吗?从概念上讲,这似乎不应该是困难的 - 将 pace / step 自定义订阅者移动到我的新发布者类型的内部,并公开另一个发布者来代替外部主题以发送事件。但我对泛型不太满意,而且我从未编写过自定义 Publisher,所以我可能会遇到麻烦。【参考方案2】:

只是想提一下,我改编了 Rob 之前的答案并将其转换为自定义发布者,以便允许一个完整的管道(请参阅他的解决方案下方的 cmets)。我的改编如下,但所有的功劳都归于他。它还使用 Rob 的 step 运算符和 SteppingSubscriber,因为此自定义 Publisher 在内部使用它们。

编辑:使用缓冲区作为modulated 运算符的一部分进行更新,否则需要附加缓冲区以缓冲上游事件。

public extension Publisher 
    func modulated<Context: Scheduler>(_ pace: Context.SchedulerTimeType.Stride, scheduler: Context) -> AnyPublisher<Output, Failure> 
        let upstream = buffer(size: 1000, prefetch: .byRequest, whenFull: .dropNewest).eraseToAnyPublisher()
        return PacePublisher<Context, AnyPublisher>(pace: pace, scheduler: scheduler, source: upstream).eraseToAnyPublisher()
    


final class PacePublisher<Context: Scheduler, Source: Publisher>: Publisher 
    typealias Output = Source.Output
    typealias Failure = Source.Failure

    let subject: PassthroughSubject<Output, Failure>
    let scheduler: Context
    let pace: Context.SchedulerTimeType.Stride

    lazy var internalSubscriber: SteppingSubscriber<Output, Failure> = SteppingSubscriber<Output, Failure>(stepper: stepper)
    lazy var stepper: ((SteppingSubscriber<Output, Failure>.Event) -> ()) = 
        switch $0 
        case .input(let input, let promise):
            // Send the input from upstream now.
            self.subject.send(input)

            // Wait for the pace interval to elapse before requesting the
            // next input from upstream.
            self.scheduler.schedule(after: self.scheduler.now.advanced(by: self.pace)) 
                promise(.more)
            

        case .completion(let completion):
            self.subject.send(completion: completion)
        
    

    init(pace: Context.SchedulerTimeType.Stride, scheduler: Context, source: Source) 
        self.scheduler = scheduler
        self.pace = pace
        self.subject = PassthroughSubject<Source.Output, Source.Failure>()

        source.subscribe(internalSubscriber)
    

    public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input 
        subject.subscribe(subscriber)
        subject.send(subscription: PaceSubscription(subscriber: subscriber))
    


public class PaceSubscription<S: Subscriber>: Subscription 
    private var subscriber: S?

    init(subscriber: S) 
        self.subscriber = subscriber
    

    public func request(_ demand: Subscribers.Demand) 

    

    public func cancel() 
        subscriber = nil
    

【讨论】:

【参考方案3】:

Publishers.CollectByTime 在这里的某个地方有用吗?

Publishers.CollectByTime(upstream: upstreamPublisher.share(), strategy: Publishers.TimeGroupingStrategy.byTime(RunLoop.main, .seconds(1)), options: nil)

【讨论】:

我没试过,但是CollectByTimeOutput类型是[Upstream.Output],即上游的Output的数组。这向我表明,CollectByTime 将收集给定步幅内的所有值,然后将它们作为单个数组发出。因此,如果我在 1 毫秒内收到 3 个值,并且我想每 1 秒发出一次,那么在 1 秒后,这个发布者将发出一个包含所有 3 个值的数组,而不是每个单独的值每秒一次。 是的,它肯定不会自己做你想做的事。但它在整体解决方案中可能很有用。例如解包数组并通过throttle 或其他方式单独发出它们。 (另外,我不肯定数组本身是按照它们在上游发出的顺序,需要验证)。【参考方案4】:

编辑

对于下面概述的原始方法,还有一种更简单的方法,它不需要起搏器,而是使用flatMap(maxPublishers: .max(1)) 创建的背压。

flatMap 发送请求 1,直到它返回的发布者(我们可以延迟)完成。我们需要上游的 Buffer 发布者来缓冲这些值。

// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()
let interval = 1.0

let pub = subject
   .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
   .flatMap(maxPublishers: .max(1)) 
      Just($0)
        .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
   

原创

我知道这是一个老问题,但我认为有一种更简单的方法可以实现这一点,所以我想分享一下。

这个想法类似于 .zipTimer,除了 Timer 之外,您将 .zip 与先前发送的值的延时“滴答”,这可以通过一个CurrentValueSubject。需要CurrentValueSubject 而不是PassthroughSubject 才能播种第一个“tick”。

// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()

let pacer = CurrentValueSubject<Void, Never>(())
let interval = 1.0

let pub = subject.zip(pacer)
   .flatMap  v in
      Just(v.0) // extract the original value
        .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
        .handleEvents(receiveOutput:  _ in 
           pacer.send() // send the pacer "tick" after the interval
        ) 
   

发生的情况是,起搏器上的 .zip 门控仅在先前发送的值延迟后到达。

如果下一个值早于允许的间隔,它会等待起搏器。 但是,如果下一个值稍后出现,那么起搏器已经有一个新值可以立即提供,因此不会有延迟。


如果你像在测试用例中那样使用它:

let c = pub.sink  print("\($0): \(Date())") 

subject.send(Date())
subject.send(Date())
subject.send(Date())

DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) 
   subject.send(Date())
   subject.send(Date())


DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) 
   subject.send(Date())
   subject.send(Date())

结果会是这样的:

2020-06-23 19:15:21 +0000: 2020-06-23 19:15:21 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:22 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:23 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:24 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:25 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:32 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:33 +0000

【讨论】:

巧妙!谢谢!

以上是关于Swift Combine:缓冲上游值并以稳定的速率发出它们?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Combine + Swift 复制 PromiseKit 风格的链式异步流

使用 Swift 和 Combine 链接 + 压缩多个网络请求

Xcode 11 中 Swift Combine.framework 的可选链接

Swift/Combine - 将过滤后的对象分配给类的属性

Swift Combine:没有“distinct”运算符?

Swift Combine - @Published 属性数组