在执行副作用之前等待来自 mergeMap 的所有 observables 完成

Posted

技术标签:

【中文标题】在执行副作用之前等待来自 mergeMap 的所有 observables 完成【英文标题】:Wait for all observables from a mergeMap to complete before performing a side effect 【发布时间】:2021-12-22 16:03:47 【问题描述】:

我还是 rxjs 的新手,所以我很难做到这一点。当我的 mergeMap 中的所有 observables 完成时,我想执行一个副作用,但我不知道如何将原始流“拆分”成一个新流。

从这里Wait for all observables in mergeMap to complete before emitting a custom value 看起来我可以使用ignoreElementsendsWith(undefined) 但我想保留mergeMap 中的原始可观察对象

例如

action$.pipe(
filter(..),
mergeMap(action => nextAction(action)),
performSideEffectWhenMergeMapDone()

我该怎么做?

我也试过这样的:


  const next = action$.pipe(
    filter(..),
    mergeMap(...))

  next.pipe(
    ignoreElements(),
    endWith(undefined),
    tap(() => ...
    )
  );
  return next

但是由于第二个史诗从未注册,因此 tap() 永远不会被击中

【问题讨论】:

【参考方案1】:

就像订阅一样,点按即可获取observer(或部分observer)。

observer有这个形状:


  next: v => ...,
  error: e => ...,
  complete: () => ...

在这种情况下,实现完整的部分observer 就足够了:

action$.pipe(
  filter(..),
  mergeMap(action => nextAction(action)),
  tap(
    complete: performSideEffectWhenMergeMapDone
  )
);

【讨论】:

我试过这个,但它似乎不起作用。也许是因为我有另一个嵌套的 mergeMap? “似乎不起作用”更具体是什么意思? 在马丁的回答中留下了评论!【参考方案2】:

如果我正确理解您的问题,这一切都是 Redux 操作的一部分吗? endWith() 在这种情况下不会帮助你,因为 endWith() 只会在其源 Observable 完成但 Redux 操作源永远不会完成时发出。

您应该做什么取决于nextAction 是什么。如果它返回一个完成的 Observable,你可以这样做:

action$.pipe(
filter(..),
mergeMap(action => nextAction(action).pipe(
  tap(
    complete: () => performSideEffectWhenMergeMapDone()
  ),
)),

换句话说,您将等待mergeMap 中的内部Observble 完成,而不是使用mergeMap 的外部Observable。

【讨论】:

啊,好吧。这是有道理的。这确实是 redux 操作的一部分。将水龙头移到里面是可行的,但也不是我想要的。在这种情况下,nextAction 也是 mergeMap 例如 doSomething(action).expand(..).mergeMap(..) 所以点击完成正在触发动作 旁注: const fn = () => gn()const fn = gn 相同。两者在语义上是相同的。虽然实际上第一个可能会稍微慢一些,因为它是两个函数调用而不是一个。它们将始终产生相同的结果。

以上是关于在执行副作用之前等待来自 mergeMap 的所有 observables 完成的主要内容,如果未能解决你的问题,请参考以下文章

Jmeter -- 定时器

在父进程恢复执行之前等待所有子进程 UNIX

在执行下一个代码块之前等待多个 RxJS 调用

如何等待来自各种可观察对象的数据执行操作

Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组

jmeter的定时器