如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable
Posted
技术标签:
【中文标题】如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable【英文标题】:How to remove transient events from RxJS combineLatest with NGRX store source observable 【发布时间】:2019-12-18 17:49:50 【问题描述】:我正在使用 TypeScript、Angular、NGRX 应用程序。我一直在编写状态可观察对象而不使用选择器——主要原因是我发现它们不如直接使用 RxJS 运算符强大。例如,不能单独使用选择器来限制事件的发射 - 而是必须使用过滤运算符。
在大多数情况下,用 observables 替换选择器没有任何问题 - observables 可以以与选择器相同的方式组成 - 除了一个例外:我不知道如何组成 observables 可能由相同的触发行动。通常,我使用 combineLatest 作为我的 goto observable composer;然而,如果两个 observables 会更新同一个 action,就会有一个短暂的更新,其中一个 observables 具有来自新状态的值,而另一个具有来自先前状态的值。
最初,我考虑使用 zip observable 创建器来代替;然而,虽然这解决了两个 observable 一起更新时的问题,但它并不能解决一个 observable 在没有另一个 observable 的情况下更新的问题——这在 NGRX 架构中是完全可能的。
然后我考虑了 auditTime(0) 运算符,它确实解决了删除瞬态更新的问题,但有新问题 1) 它会导致 observables 在稍后的事件循环中发出,这会破坏应用程序内部的一些假设(可解决,但很烦人) 2)它使各种可观察对象尽快发射,而我希望所有可观察对象在同一个存储脉冲上一起发射。从图形上看,这意味着应用程序的不同部分的渲染是交错的,而不是在同一帧上一起绘制(请注意,我们的应用程序数据量很大,并且经常需要在存储脉冲上丢帧)
最后,我编写了一个自定义操作符来组合来自同一来源的 observables
export type ObservableTuple<TupleT extends any[]> =
[K in keyof TupleT]: Observable<TupleT[K]>;
;
export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT>
return (source$: Observable<SourceT>) => source$.pipe(
withLatestFrom(combineLatest<TupleT>(inputs)),
map(([, values]) => values),
);
这里总结一下TypeScript中的问题(使用NGRX、RxJS、Angular的sn-ps)
interface IState
foo: string;
bar: string;
@Injectable()
class SomeService
constructor(store$: Store<IState>)
readonly foo$ = this.store$.pipe(select(state => state.foo));
readonly bar$ = this.store$.pipe(select(state => state.bar));
readonly composed$ = this.store$.pipe(
selectFrom(
this.foo$,
this.bar$,
),
map(([foo, bar]) => `$foo - $bar`),
);
const UPDATE_FOO =
type: 'update foo',
foo: 'some updated value for foo'
;
const UPDATE_BAR =
type: 'update bar',
bar: 'some updated value for bar',
;
const UPDATE_BOTH =
type: 'update both',
both: 'some updated value for both foo and bar',
;
即使 selectFrom 调用相互嵌套,也可以正常工作,例如
readonly composed2$ = this.store$.pipe(
selectFrom(
this.composed$,
this.foo$
)
)
只要在composed2$之前定义commodated$,一切都会顺利;但是,我没有考虑的一个情况是在组合 $ 和组合 2 $ 之间使用 switchMap 之类的运算符时。在这种情况下,由于 compsed2$ 被 switchMap 销毁并重新创建,commodated2$ 可能在composed$ 之前触发,这会导致一切不同步
【问题讨论】:
【参考方案1】:对于您尝试组合 2 个可观察对象并且仅在它们都完成发射后才发射的特定问题,您可以尝试利用:
queue Scheduler - 让您推迟递归调用,直到当前调用完成 debounce - 延迟更新直到信号到达 observeOn - 只监听队列调度器上的存储更新然后您可以执行以下操作:
readonly queuedStore$ = this.store$.pipe(
observeOn(queue), // use queue scheduler to listen to store updates
share() // use the same store event to update all of our selectors
);
// use queuedStore$ here
readonly foo$ = this.queuedStore$.pipe(select(state => state.foo));
readonly bar$ = this.queuedStore$.pipe(select(state => state.bar));
// when composing, debounce the combineLatest() with an observable
// that completes immediately, but completes on the queue scheduler
readonly composed$ = combineLatest(foo$, bar$).pipe(
debounce(() => empty().pipe(observeOn(queue))));
会发生什么?
Foo 更新
-
queuedStore$ 在
queue
上安排通知
通知立即开始,因为当前没有运行任何内容
foo$
通知
combineLatest
通知
debounce
订阅 durationSelector
durationSelector 在queue
上安排通知
未发送通知,因为队列中的操作当前正在运行
调用堆栈展开到第 1 步
队列调度程序运行 durationSelector 通知
去抖动触发并向 UI 发送更新
栏更新
与 Foo 更新相同
BarFoo 更新
-
queuedStore$ 在
queue
上安排通知
通知立即开始,因为当前没有运行任何内容
foo$
通知
combineLatest
通知
debounce
订阅 durationSelector
durationSelector 在queue
上安排通知
未发送通知,因为队列中的操作当前正在运行
调用堆栈展开到第 3 步
bar$
通知
combineLatest
通知
debounce
从 foo 通知中丢弃以前的值
debounce
重新订阅durationSelector
durationSelector 在queue
上安排通知
未发送通知,因为队列中的操作当前正在运行
调用堆栈展开到第 1 步
队列调度程序运行 durationSelector 通知
去抖动触发并向 UI 发送更新
理论上,这会让你得到你想要的行为:
- 单个更新立即应用(在下一个刻度之前)
- 组合更新立即应用(在下一个刻度之前)
- 组合更新忽略中间结果
- 如果您的组合 observable 使用 switch
,应该仍然有效。
注意事项
如果您在 queue
调度程序上处理其中一个通知时调度另一个事件,则该第二个事件的通知将推迟到当前处理程序完成之后。
【讨论】:
这看起来很有希望。我设置了一个虚拟示例,它似乎具有我想要的基本属性。明天我会尝试将它集成到我的应用程序中,看看它是否没有选中任何框。非常感谢您的详细回答!顺便说一句,我认为要注意的事情很好,因为 - 通常我想在下一个状态之前处理当前状态 - 商店(很确定..)已经在 queueScheduler 上观察 这种方法有一些陷阱,但总的来说它满足了我提出的所有要求。非常感谢,这真的很有帮助!以上是关于如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable的主要内容,如果未能解决你的问题,请参考以下文章
在 RxJS 5.0 中找不到“combineLatest”
Rxjs:Observable.combineLatest vs Observable.forkJoin
ionic - RXJS 错误:rxjs_Observable__.Observable.combineLatest 不是函数
[RxJS] Combining Streams with CombineLatest