如何在链式可观察对象之间传递结果

Posted

技术标签:

【中文标题】如何在链式可观察对象之间传递结果【英文标题】:How to pass results between chained observables 【发布时间】:2020-12-02 19:06:31 【问题描述】:

抽象问题: 每次源 Observable 发出和事件时,都需要触发一系列 API 调用和 Angular 服务。其中一些调用取决于以前的结果。

在我的示例中,源 Observable startUpload$ 触发一系列依赖调用。

使用解构可以这样写:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),
      map(( event, headers ) => this.generateUploadId(event, headers)),
      tap(( event, headers, id ) => this.emitUploadStartEvent(id, event)),
      concatMap(( event, headers, id ) => this.createPdfDocument(event, headers, id)),
      concatMap(( event, headers, id, pdfId ) => this.uploadBilderForPdf(event, pdfId, headers, id)),
      mergeMap(( event, headers, id, pdfId, cloudId ) => this.closePdf(cloudId, event, headers, id, pdfId)),
      tap(( event, headers, id, pdfId, cloudId ) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe()

它几乎读起来像是一种命令式方法。但它有一定的问题:

解构链在代码中重复并且越来越长 event, headers, id, pdfId, cloudId 方法(如generateUploadId(event, headers))需要接收所有以前的值,以便它们能够将它们传递到下一个管道,即使方法本身不需要它 需要内部 Observable(在方法中)来映射值,以便进一步的管道阶段可以破坏它们:

_

private closePdf(cloudId, event, headers, id, pdfId) 
    return this.httpClient.post(...,  headers  )
        .pipe(
             //...,
             map(() => ( event, headers, id, pdfId, cloudId ))
        )

如果编译器可以处理样板文件(如async await)来编写如下代码(没有上述问题),那就太好了:

private startUpload(event: StartUploadEvent) 
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
  

如何在没有我提到的问题的情况下在链式可观察对象之间传递结果?有没有我错过的 rxjs 概念?

【问题讨论】:

为什么不使用闭包? 我认为闭包也可能对您有所帮助。否则,您真的必须将map() 结果转换为数组或对象,然后对其进行解构。另一种选择是对每个 Observable 使用 toPromise(),然后对每个 Observable 使用 await 【参考方案1】:

您当然不应该让您的方法接受与它们无关的参数!

你的主要问题:

如何在没有我提到的问题的情况下在链式 observables 之间传递结果?

使用单个范围(嵌套管道)

下面的代码等同于您的示例代码,无需传递不必要的属性。先前返回的值可以通过更下游的函数调用访问:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event, headers).pipe(
4         tap(id => emitUploadStartEvent(id, event)),
5         concatMap(id => createPdfDocument(event, headers, id)),
6         concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7         tap(cloudId => closePdf(cloudId, event))
8       ))
9     ))
10  ).subscribe();

注意eventheaders 是如何在下游访问的。不需要将它们传递给不需要它们的函数。

有没有我错过的 rxjs 概念?

也许吧。?不是真的... :-)

诀窍是添加.pipe 以有效地对运算符进行分组,以便他们都可以访问输入参数。

通常,我们尝试将代码保持在.pipe

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/$id`)),
3     map(response => response.data.userName),
4     map(name => `Hello $name!`),
5     tap(greeting => console.log(greeting))
6   );

但那段代码真的和:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/$id`).pipe(
3       map(response => response.data.userName),
4       map(name => `Hello $name! (aka User #$id)`)
5     )),
6     tap(greeting => console.log(greeting))
7   );

但是,在第二种情况下,第 4 行可以访问nameid,而在第一种情况下,它只能访问name

注意第一个签名是userId$.pipe(switchMap(), map(), map(), tap())

第二个是:userId$.pipe(switchMap(), tap())

【讨论】:

回顾迄今为止的答案我认为这是最好的方法。它使用标准的 RxJs 运算符,函数不需要传递参数,并且没有任何异常情况或需要。 “嵌套管道”允许访问 nameid 的原因是因为 closure 这是 OP 所要求的。 :) @dmcgrandle 我反其道而行之!由于嵌套管道,关闭工作。从上面的“平面”示例中,4 个不同的运算符中使用了闭包,它们无助于实现目标。只有当你嵌套管道时,闭包才会有用。嵌套管道将多个运算符置于同一个闭包(作用域)中,这是实现此目的的核心。【参考方案2】:

您的方法绝对不应与上下文耦合,也不应考虑将结果映射到特定形状。

RxJS 是关于函数式编程的。在函数式编程中有一种模式,如Adapting Arguments to Parametersref

它允许我们将方法签名与上下文分离。

为了实现这一点,您可以编写取决于 mapcontentMapmergMap 运算符版本的上下文,以便最终解决方案如下所示:

this.startUploadEvent$.pipe(
      map(withKey('event')),
      concatMap_((event) => this.getAuthenticationHeaders(event), 'headers'),
      map_(( headers ) => this.generateUploadId(headers), 'id'),
      tap(( event, id ) => this.emitUploadStartEvent(id, event)),
      concatMap_(( id ) => this.createPdfDocument(id), 'pdfId'),
      concatMap_(( pdfId ) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
      mergeMap_(( cloudId ) => this.closePdf(cloudId)),
      tap((id, event, cloudId) => this.emitUploadDoneEvent(id, event, cloudId)),
    ).subscribe(console.log);

在这些运算符之后注意_

Stackblitz Example

那些自定义操作符的目标是获取参数对象,通过投影函数,将投影结果添加到原始参数对象中。

function map_<K extends string, P, V>(project: (params: P) => V): OperatorFunction<P, P>;
function map_<K extends string, P, V>(project: (params: P) => V, key: K): OperatorFunction<P, P & Record<K, V>>;
function map_<K extends string, P, V>(project: (params: P) => V, key?: K): OperatorFunction<P, P> 
  return map(gatherParams(project, key));


function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> 
  return concatMap(gatherParamsOperator(projection, key));


function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> 
  return mergeMap(gatherParamsOperator(projection, key));


// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string, P, V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key: K): (params: P) => P & Record<K, V>;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key?: K): (params: P) => P 
  return (params: P) => 
    if (typeof key === 'string') 
      return Object.assign(, params,  [key]: fn(params)  as Record<K, V>);
    

    return params;
  ;


function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key: K): (params: P) => Observable<P & Record<K, V>>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key?: K): (params: P) => Observable<P> 
  return (params: P) => 
    return fn(params).pipe(map(value => gatherParams((_: P) => value, key)(params)));
  ;


function withKey<K extends string, V>(key: K): (value: V) => Record<K, V> 
  return (value: V) => ( [key]: value  as Record<K, V>);

我在这里使用了函数重载,因为有时我们不需要为参数添加额外的键。只有在this.closePdf(...)方法的情况下,参数才应该通过它。

因此,您将获得与之前具有类型安全性的相同版本的解耦版本:

看起来是不是过度设计?

在大多数情况下,您应该遵循 YAGNI(您不需要它)原则。最好不要给现有代码增加更多的复杂性。对于这种情况,您应该坚持一些简单的在运算符之间共享参数的实现,如下所示:

ngOnInit() 
  const params: Partial<Params> = ;
  this.startUploadEvent$.pipe(
    concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),
    map(headers => (params.headers = headers) && this.generateUploadId(headers)),
    tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id, event)),
    concatMap(id => this.createPdfDocument(id)),
    concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),
    mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),
    tap(() => this.emitUploadDoneEvent(params.pdfId, params.cloudId, params.event)),
  ).subscribe(() => 
    console.log(params)
  );

Params 类型在哪里:

interface Params 
  event: any;
  headers: any;
  uploadId: any;
  pdfId: any;
  cloudId: any;

请注意我在作业中使用的括号(params.cloudId = cloudId)

Stackblitz Example


还有很多其他方法,但它们需要改变你使用 rxjs 操作符的流程:

https://medium.com/@snorredanielsen/rxjs-accessing-a-previous-value-further-down-the-pipe-chain-b881026701c1

https://medium.com/@snorredanielsen/rxjs-accessing-a-previous-value-further-down-the-pipe-chain-b881026701c1

【讨论】:

在我看来(虽然看起来很酷)这是过度工程的典型例子。除非它在许多其他地方使用,否则最好避免使用它。 @m4design 无可争辩。这绝对是真的。我添加了一个简化的替代解决方案。 使用param: Partial&lt;Params&gt; 的简化解决方案会导致使用strictNullChecks: true 编译器选项的TS 错误。【参考方案3】:

你可以:

将每个动作的结果分配给可观察对象

根据早期结果链接后续函数调用

这些结果可以通过withLatestFrom在以后的操作调用中重复使用

shareReplay 用于防止后面的withLatestFrom 订阅导致前面的函数重新执行

function startUpload(event$: Observable<string>) 
  const headers$ = event$.pipe(
    concatMap(event => getAuthenticationHeaders(event)),
    shareReplay()
    );

  const id$ = headers$.pipe(
    map(() => generateUploadId()),
    shareReplay()
    );

  const emitUploadEvent$ = id$.pipe(
    withLatestFrom(event$),   // use earlier result
    map(([id, event]) => emitUploadStartEvent(id, event)),
    shareReplay()
    );

   // etc

如上,函数只取需要的参数,没有传递。

演示:https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

这个模式可以通过使用 rxjs 自定义操作符来简化(注意这可以进一步细化,包括打字):

function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
  action: (...args: any[]) => R,
  ignoreInput: boolean,
  ...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> 
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),
    operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
    shareReplay(1)
  );

可以这样使用:

function startUpload(event$: Observable<string>) 
  const headers$ = event$.pipe(
    call(concatMap, getAuthenticationHeaders, true)
  );

  const id$ = headers$.pipe(
    call(map, generateUploadId, false)
  );

  const startEmitted$ = id$.pipe(
    call(map, emitUploadStartEvent, true, event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map, createPdfDocument, false, event$, headers$, id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map, closePdf, false, headers$, pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map, emitUploadDoneEvent, true, id$, event$)
  );

  // return cloudId$ instead of uploadDone$ but preserve observable chain
  return uploadDone$.pipe(concatMap(() => cloudId$));    

演示:https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts

【讨论】:

【参考方案4】:

您可以为数据集使用一个对象吗?像这样的:

界面:

export interface Packet 
  event: string;
  headers?: string;
  id?: number;
  pdfId?: number;
  cloudId?: number;

然后在代码中,是这样的:

服务:

  this.startUploadEvent$.pipe(
    concatMap(packet => this.doThingOne(packet)),
    map(packet => this.doThingTwo(packet)),
    tap(packet => this.doThingThree(packet)),
    // ...
  );

这样,每个方法都可以使用它需要的对象的位并传递其余的位。虽然这确实需要更改每个方法来接收和处理对象。

【讨论】:

这增强了可读性,因为它减少了解构。但是每个doThing 方法仍然接收到一个他们不需要的参数并返回一个与其方法上下文无关的值。举个例子:我们不会像private calcAge(person: Person): Person ... 这样设计一种方法来计算一个人的年龄。相反,我们会像这样设计它private calcAge(birthdate: date): number ...。哪个更通用,易于测试,并且不与它运行的上下文耦合。 那么,如上所述,嵌套管道可能就是您要寻找的。​​span> 【参考方案5】:

据我了解,您关心的是可读性,而不必在方法之间携带有效负载。

你有没有想过将 Observable 转换为 Promise?这里重要的是,observables 必须完成,这样才能实现 promise 并可以解决(与 complete 相同,但仅适用于 promise)。

根据您的建议,见上文(如使用 async await)我提出了这个建议。

private async startUpload(event: StartUploadEvent) 
    const headers = await this.getAuthenticationHeaders(event).toPromise();
    const id = await this.generateUploadId().toPromise();
    
    this.emitUploadStartEvent(id, event);
    
    const pdfId = await this.createPdfDocument(event, headers, id).toPromise();
    await this.uploadBilderForPdf(event, pdfId, headers, id).toPromise();
    
    const cloudId = await this.closePdf(headers, pdfId).toPromise();
    this.emitUploadDoneEvent(id, event, cloudId)
    
    return cloudId

信息:您可以在此处阅读如果在未完成 observable 的情况下将 observable 转换为 Promise 会发生什么:Why converted promise from Subject (Observable) does not work as expected

注意:我正在满足您的期望

也许还有其他不违反常见最佳实践的方法来解决问题

【讨论】:

这就是我对承诺的想法。对我来说,这是最具可读性和实用性的解决方案。但是当我回退到(无样板文件:))承诺时,我感觉错过了一些东西,或者不理解 rxjs 的完整概念。 我也在考虑同样的方法。在我看来,如果一个常见的最佳实践使您的代码难以阅读或理解,或者使您将不必要的参数传递给不需要它们的函数,那么它不再是您的案例的“最佳实践”。 @serkan-sepahi 的解决方案看起来简单易行,这就是我想要的方式。【参考方案6】:

您对此类代码产生的问题是正确的,抽象的解决方案是将合并结果并将正确参数传递给每个调用的责任从方法转移到管道。

可以很容易地完成一些改进。 tap 运算符不会修改值,因此您可以从解构中删除不需要的属性。 map 只是转换结果,所以改为

map(( event, headers ) => this.generateUploadId(event, headers)),

我们可以写

map(( event, headers ) => (
  event,
  headers,
  id: this.generateUploadId(event, headers)
))

并且this.generateUploadId 不再需要返回对象。

至于高阶映射运算符,我想到了几个选项。 首先,大多数 'xMap' 运算符都支持将结果选择器作为最后一个参数,其目的正是我们所需要的——将源值与结果结合起来。结果选择器是depricated,因此嵌套管道是当前的方式,但让我们看看使用结果选择器会是什么样子

选项 0. 结果选择器(已弃用)

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event),
      (event, headers) => ( event, headers ) // <-- Result Selector
    )
  );

选项 1. 嵌套管道(又名“使用闭包”)

它看起来与选项 0 非常相似,但 event 保留在闭包中,而不是内部 observable。

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event)
        .pipe(map(headers => ( event, headers )))
    )
  );

选项 2. 自定义运算符(这里也有闭包)

可以制作自定义运算符并获得与结果选择器非常相似的语法

function withResultSelector(operator, transformer) 
  let sourceValue;
  return pipe(
    tap(value => (sourceValue = value)),
    operator,
    map(value => transformer(sourceValue, value))
  );

用法:

this.startUploadEvent$
  .pipe(
    withResultSelector(
      concatMap(event => this.getAuthenticationHeaders(event)),
      (event, headers) => ( event, headers )
    )
  );

更进一步,可以提取重复的内容并使所有内容更实用:

const mergeAs = propName => (a, b) => ( ...a, [propName]: b );
const opAndMergeAs = (operator, propName) => withResultSelector(operator, mergeAs(propName));

this.startUploadEvent$
  .pipe(
    opAndMergeAs(concatMap(event => this.getAuthenticationHeaders(event)), "headers")
  );

为此编写适当的类型可能有点麻烦,但这是一个不同的问题

Playground我曾经写过答案。

【讨论】:

很遗憾resultSelector 已被弃用,它有助于分离实际处理和为下一阶段准备结果。它认为选项 1 中的 map(...) 在第二个 pipe 中丢失了 同意,resultSelector 具有特定的语义,看起来很自然,但经过一些练习和改变思维方式后应该可以习惯嵌套管道。【参考方案7】:

您提到的这些顾虑和问题是对的,但我在这里看到的问题是将您的思维方式从命令式方法转变为反应式/函数式方法,但让我们先回顾一下命令式代码

private startUpload(event: StartUploadEvent) 
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId

在这里,您可以看到 event 的内容更加干净,您可以传递并只获取您想要的内容并将其传递给下一个函数,我们希望将此代码移至反应式/函数式方法。

从我的角度来看,主要问题是你让你的函数失去了他们所拥有的上下文,例如 getAuthenticationHeaders 根本不应该返回 event 它应该只返回 headers 并且对于其他函数也是如此。

在处理 RxJS(又名反应式方法)时,您会经常处理这些问题,这没关系,因为它保持了功能概念的应用,并使您的代码更具可预测性,因为 pure 运算符应该只同时处理数据管道保持一切纯净,不会导致副作用,从而导致不可预测的代码。

我认为你在寻找什么可以用nested pipes 解决(这是我认为最好的解决方案)

concatMap(event => this.getAuthenticationHeaders(event).pipe(
    map(headers => this.generateUploadId(event, headers).pipe())
))

它在一些 RxJS 后端库中被大量使用,例如 Marble.js

你可以使用类似于Result Selector的方法:

concatMap(event => this.getAuthenticationHeaders(event).pipe(
  map(headers => ( headers, event ))
)),

或者人们建议的其他很棒的解决方案可以让它工作,但你仍然会遇到你提到的同样的问题,但代码更干净/可读。

您也可以将其转换为 async/await 方法,但您将失去 RxJS 提供给您的反应性。

我的建议是尝试阅读更多关于反应式编程以及如何将你的思维转移到那个方面的信息,我将在此处提供一些链接,我认为这些链接非常适合开始并尝试一些基于此构建的库像 CycleJS 这样的 RxJS 的顶部,我建议阅读有关函数式编程的内容,这对这本很棒的书籍 Mostly adequate guide to FP (in javascript) 也有很大帮助 &Composing Software.

我推荐这个很棒的 Talk RxJS Recipes,它将改变你使用 RxJS 的方式。

有用的资源:

https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 https://www.youtube.com/watch?v=vS1-jzngpmw https://www.youtube.com/watch?v=uQ1zhJHclvs https://egghead.io/lessons/rxjs-organize-code-in-the-main-and-effect-functions https://www.youtube.com/watch?v=XKfhGntZROQ

【讨论】:

以上是关于如何在链式可观察对象之间传递结果的主要内容,如果未能解决你的问题,请参考以下文章

React 如何订阅在另一个组件中声明的可观察对象

Android:如何在包含其他对象的活动之间传递对象

如何将可观察值传递给@Input() Angular 4

如何使剔除可观察数组中的对象属性可观察?

如何在链式函数中将选定元素传递给“this”

如何在 Flutter 中设置 Firebase Analytics 自定义事件,而不在每个屏幕中传递“分析/观察者”对象