在执行副作用之前等待来自 mergeMap 的所有 observables 完成
Posted
技术标签:
【中文标题】在执行副作用之前等待来自 mergeMap 的所有 observables 完成【英文标题】:Wait for all observables from a mergeMap to complete before performing a side effect 【发布时间】:2021-12-22 16:03:47 【问题描述】:我还是 rxjs 的新手,所以我很难做到这一点。当我的 mergeMap 中的所有 observables 完成时,我想执行一个副作用,但我不知道如何将原始流“拆分”成一个新流。
从这里Wait for all observables in mergeMap to complete before emitting a custom value 看起来我可以使用ignoreElements
和endsWith(undefined)
但我想保留mergeMap 中的原始可观察对象
例如
action$.pipe(
filter(..),
mergeMap(action => nextAction(action)),
performSideEffectWhenMergeMapDone()
我该怎么做?
我也试过这样的:
const next = action$.pipe(
filter(..),
mergeMap(...))
next.pipe(
ignoreElements(),
endWith(undefined),
tap(() => ...
)
);
return next
但是由于第二个史诗从未注册,因此 tap() 永远不会被击中
【问题讨论】:
【参考方案1】:就像订阅一样,点按即可获取observer
(或部分observer
)。
observer
有这个形状:
next: v => ...,
error: e => ...,
complete: () => ...
在这种情况下,实现完整的部分observer
就足够了:
action$.pipe(
filter(..),
mergeMap(action => nextAction(action)),
tap(
complete: performSideEffectWhenMergeMapDone
)
);
【讨论】:
我试过这个,但它似乎不起作用。也许是因为我有另一个嵌套的 mergeMap? “似乎不起作用”更具体是什么意思? 在马丁的回答中留下了评论!【参考方案2】:如果我正确理解您的问题,这一切都是 Redux 操作的一部分吗? endWith()
在这种情况下不会帮助你,因为 endWith()
只会在其源 Observable 完成但 Redux 操作源永远不会完成时发出。
您应该做什么取决于nextAction
是什么。如果它返回一个完成的 Observable,你可以这样做:
action$.pipe(
filter(..),
mergeMap(action => nextAction(action).pipe(
tap(
complete: () => performSideEffectWhenMergeMapDone()
),
)),
换句话说,您将等待mergeMap
中的内部Observble 完成,而不是使用mergeMap
的外部Observable。
【讨论】:
啊,好吧。这是有道理的。这确实是 redux 操作的一部分。将水龙头移到里面是可行的,但也不是我想要的。在这种情况下,nextAction 也是mergeMap
例如 doSomething(action).expand(..).mergeMap(..)
所以点击完成正在触发动作
旁注: const fn = () => gn()
与 const fn = gn
相同。两者在语义上是相同的。虽然实际上第一个可能会稍微慢一些,因为它是两个函数调用而不是一个。它们将始终产生相同的结果。以上是关于在执行副作用之前等待来自 mergeMap 的所有 observables 完成的主要内容,如果未能解决你的问题,请参考以下文章
Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组