如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable

Posted

技术标签:

【中文标题】如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable【英文标题】:How to remove transient events from RxJS combineLatest with NGRX store source observable 【发布时间】:2019-12-18 17:49:50 【问题描述】:

我正在使用 TypeScript、Angular、NGRX 应用程序。我一直在编写状态可观察对象而不使用选择器——主要原因是我发现它们不如直接使用 RxJS 运算符强大。例如,不能单独使用选择器来限制事件的发射 - 而是必须使用过滤运算符。

在大多数情况下,用 observables 替换选择器没有任何问题 - observables 可以以与选择器相同的方式组成 - 除了一个例外:我不知道如何组成 observables 可能由相同的触发行动。通常,我使用 combineLatest 作为我的 goto observable composer;然而,如果两个 observables 会更新同一个 action,就会有一个短暂的更新,其中一个 observables 具有来自新状态的值,而另一个具有来自先前状态的值。

最初,我考虑使用 zip observable 创建器来代替;然而,虽然这解决了两个 observable 一起更新时的问题,但它并不能解决一个 observable 在没有另一个 observable 的情况下更新的问题——这在 NGRX 架构中是完全可能的。

然后我考虑了 auditTime(0) 运算符,它确实解决了删除瞬态更新的问题,但有新问题 1) 它会导致 observables 在稍后的事件循环中发出,这会破坏应用程序内部的一些假设(可解决,但很烦人) 2)它使各种可观察对象尽快发射,而我希望所有可观察对象在同一个存储脉冲上一起发射。从图形上看,这意味着应用程序的不同部分的渲染是交错的,而不是在同一帧上一起绘制(请注意,我们的应用程序数据量很大,并且经常需要在存储脉冲上丢帧)

最后,我编写了一个自定义操作符来组合来自同一来源的 observables

export type ObservableTuple<TupleT extends any[]> = 
  [K in keyof TupleT]: Observable<TupleT[K]>;
;

export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT> 
  return (source$: Observable<SourceT>) => source$.pipe(
    withLatestFrom(combineLatest<TupleT>(inputs)),
    map(([, values]) => values),
  );

这里总结一下TypeScript中的问题(使用NGRX、RxJS、Angular的sn-ps)

interface IState 
    foo: string;
    bar: string;


@Injectable()
class SomeService 
    constructor(store$: Store<IState>) 
    

    readonly foo$ = this.store$.pipe(select(state => state.foo));
    readonly bar$ = this.store$.pipe(select(state => state.bar));

    readonly composed$ = this.store$.pipe(
        selectFrom(
            this.foo$,
            this.bar$,
        ),
        map(([foo, bar]) => `$foo - $bar`),
    );


const UPDATE_FOO = 
    type: 'update foo',
    foo: 'some updated value for foo'
;
const UPDATE_BAR = 
    type: 'update bar',
    bar: 'some updated value for bar',
;
const UPDATE_BOTH = 
    type: 'update both',
    both: 'some updated value for both foo and bar',
;

即使 selectFrom 调用相互嵌套,也可以正常工作,例如

readonly composed2$ = this.store$.pipe(
   selectFrom(
      this.composed$,
      this.foo$
   )
)

只要在composed2$之前定义commodated$,一切都会顺利;但是,我没有考虑的一个情况是在组合 $ 和组合 2 $ 之间使用 switchMap 之类的运算符时。在这种情况下,由于 compsed2$ 被 switchMap 销毁并重新创建,commodated2$ 可能在composed$ 之前触发,这会导致一切不同步

【问题讨论】:

【参考方案1】:

对于您尝试组合 2 个可观察对象并且仅在它们都完成发射后才发射的特定问题,您可以尝试利用:

queue Scheduler - 让您推迟递归调用,直到当前调用完成 debounce - 延迟更新直到信号到达 observeOn - 只监听队列调度器上的存储更新

然后您可以执行以下操作:

readonly queuedStore$ = this.store$.pipe(
    observeOn(queue), // use queue scheduler to listen to store updates
    share() // use the same store event to update all of our selectors
  );

// use queuedStore$ here
readonly foo$ = this.queuedStore$.pipe(select(state => state.foo));
readonly bar$ = this.queuedStore$.pipe(select(state => state.bar));

// when composing, debounce the combineLatest() with an observable
// that completes immediately, but completes on the queue scheduler
readonly composed$ = combineLatest(foo$, bar$).pipe(
  debounce(() => empty().pipe(observeOn(queue))));

会发生什么?

Foo 更新

    queuedStore$ 在queue 上安排通知 通知立即开始,因为当前没有运行任何内容 foo$ 通知 combineLatest 通知 debounce 订阅 durationSelector durationSelector 在queue 上安排通知 未发送通知,因为队列中的操作当前正在运行 调用堆栈展开到第 1 步 队列调度程序运行 durationSelector 通知 去抖动触发并向 UI 发送更新

栏更新

与 Foo 更新相同

BarFoo 更新

    queuedStore$ 在queue 上安排通知 通知立即开始,因为当前没有运行任何内容 foo$ 通知 combineLatest 通知 debounce 订阅 durationSelector durationSelector 在queue 上安排通知 未发送通知,因为队列中的操作当前正在运行 调用堆栈展开到第 3 步 bar$ 通知 combineLatest 通知 debounce 从 foo 通知中丢弃以前的值 debounce 重新订阅durationSelector durationSelector 在queue 上安排通知 未发送通知,因为队列中的操作当前正在运行 调用堆栈展开到第 1 步 队列调度程序运行 durationSelector 通知 去抖动触发并向 UI 发送更新

理论上,这会让你得到你想要的行为: - 单个更新立即应用(在下一个刻度之前) - 组合更新立即应用(在下一个刻度之前) - 组合更新忽略中间结果 - 如果您的组合 observable 使用 switch,应该仍然有效。

注意事项

如果您在 queue 调度程序上处理其中一个通知时调度另一个事件,则该第二个事件的通知将推迟到当前处理程序完成之后。

【讨论】:

这看起来很有希望。我设置了一个虚拟示例,它似乎具有我想要的基本属性。明天我会尝试将它集成到我的应用程序中,看看它是否没有选中任何框。非常感谢您的详细回答!顺便说一句,我认为要注意的事情很好,因为 - 通常我想在下一个状态之前处理当前状态 - 商店(很确定..)已经在 queueScheduler 上观察 这种方法有一些陷阱,但总的来说它满足了我提出的所有要求。非常感谢,这真的很有帮助!

以上是关于如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable的主要内容,如果未能解决你的问题,请参考以下文章

在 RxJS 5.0 中找不到“combineLatest”

Rxjs:Observable.combineLatest vs Observable.forkJoin

ionic - RXJS 错误:rxjs_Observable__.Observable.combineLatest 不是函数

[RxJS] Combining Streams with CombineLatest

使用 combineLatest 和 Rxjs 返回 observable 的结果

使用 takeUntil 和 combineLatest 取消订阅可观察的 rxjs 不起作用