RxJS 序列等价于 promise.then()?

Posted

技术标签:

【中文标题】RxJS 序列等价于 promise.then()?【英文标题】:RxJS sequence equivalent to promise.then()? 【发布时间】:2016-04-04 01:30:32 【问题描述】:

我曾经很有前途地开发了很多东西,现在我正在转向 RxJS。 RxJS 的文档没有提供关于如何从承诺链转移到观察者序列的非常清晰的示例。

例如,我通常写多个步骤的promise链,比如

// a function that returns a promise
getPromise()
.then(function(result) 
   // do something
)
.then(function(result) 
   // do something
)
.then(function(result) 
   // do something
)
.catch(function(err) 
    // handle error
);

我应该如何用 RxJS 风格重写这个 Promise 链?

【问题讨论】:

【参考方案1】:

RxJS 序列等价于 promise.then()?

例如

function getdata1 (argument) 
        return this.http.get(url)
            .map((res: Response) => res.json());
    

    function getdata2 (argument) 
        return this.http.get(url)
            .map((res: Response) => res.json());
    

    getdata1.subscribe((data1: any) => 
        console.log("got data one. get data 2 now");
        getdata2.subscribe((data2: any) => 
            console.log("got data one and two here");
        );
    );

【讨论】:

【参考方案2】:

我就是这样做的。

以前

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) 
    const request = gapi.client.people.people.connections.list(
      resourceName: 'people/me',
      pageSize: 100,
      personFields: 'phoneNumbers,organizations,emailAddresses,names'
    ).then(response => 
      onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
    );
  

// caller:

  this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => 
      // handle rsp;
  );

之后(ly?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> 
    return from(
      new Promise((resolve, reject) => 
        gapi.client.people.people.connections.list(
          resourceName: 'people/me',
          pageSize: 100,
          personFields: 'phoneNumbers,organizations,emailAddresses,names'
        ).then(result => 
          resolve(result);
        );
      )
    ).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => 
      return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
    ));
  

// caller

this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => 
  // handle rsp
), (error) => 
  // handle error
);

【讨论】:

副作用:在将回调转换为 observable 后,更改检测也开始工作。【参考方案3】:

2019 年 5 月更新,使用 RxJs 6

同意上面提供的答案,希望使用 RxJs v6 添加一个带有一些玩具数据和简单承诺(带有 setTimeout)的具体示例以增加清晰度。

只需将传递的 id(当前硬编码为 1)更新为不存在的内容即可执行错误处理逻辑。重要的是,还要注意 ofcatchError 消息的使用。

import  from as fromPromise, of  from "rxjs";
import  catchError, flatMap, tap  from "rxjs/operators";

const posts = [
   title: "I love javascript", author: "Wes Bos", id: 1 ,
   title: "CSS!", author: "Chris Coyier", id: 2 ,
   title: "Dev tools tricks", author: "Addy Osmani", id: 3 
];

const authors = [
   name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" ,
  
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  ,
   name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" 
];

function getPostById(id) 
  return new Promise((resolve, reject) => 
    setTimeout(() => 
      const post = posts.find(post => post.id === id);
      if (post) 
        console.log("ok, post found!");
        resolve(post);
       else 
        reject(Error("Post not found!"));
      
    , 200);
  );


function hydrateAuthor(post) 
  return new Promise((resolve, reject) => 
    setTimeout(() => 
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) 
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
       else 
        reject(Error("Author not Found!"));
      
    , 200);
  );


function dehydratePostTitle(post) 
  return new Promise((resolve, reject) => 
    setTimeout(() => 
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    , 200);
  );


// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => 
    return hydrateAuthor(post);
  ),
  flatMap(post => 
    return dehydratePostTitle(post);
  ),
  catchError(error => of(`Caught error: $error`))
);

source$.subscribe(console.log);

输出数据:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
 author:
    name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' ,
  id: 1 

关键部分,使用plain promise control flow相当于如下:

getPostById(1)
  .then(post => 
    return hydrateAuthor(post);
  )
  .then(post => 
    return dehydratePostTitle(post);
  )
  .then(author => 
    console.log(author);
  )
  .catch(err => 
    console.error(err);
  );

【讨论】:

完美的答案,但现在 flatMap 被贬低了!新方法是什么? flatMap -> 合并地图【参考方案4】:

更现代的选择:

import from as fromPromise from 'rxjs';
import catchError, flatMap from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => 
       // do something
   ),
   flatMap(result => 
       // do something
   ),
   flatMap(result => 
       // do something
   ),
   catchError(error => 
       // handle error
   )
)

还请注意,要使所有这些工作,您需要在某处将 subscribe 发送到此管道 Observable,但我认为它已在应用程序的其他部分处理。

【讨论】:

我对 RxJS 很陌生,但考虑到我们在这里只处理 one 事件的初始流,因此 mergeMap() 实际上并不有什么要合并,我相信在这种情况下我们可以使用concatMap()switchMap() 实现完全相同的目标。我说对了吗……?【参考方案5】:

如果我理解正确,您的意思是使用这些值,在这种情况下您使用 sbuscribe 即

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

此外,您可以使用 toPromise() 将 observable 转换为 Promise,如下所示:

arrObservable.toPromise().then()

【讨论】:

【参考方案6】:

据我所知,如果您在 flatMap 中返回结果,即使您返回了字符串,它也会将其转换为数组。

但是如果你返回一个 Observable,那个 observable 可以返回一个字符串;

【讨论】:

【参考方案7】:

如果getPromise 函数位于流管道的中间,您应该简单地将其包装到函数mergeMapswitchMapconcatMap(通常是mergeMap)中:

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

如果你想用getPromise() 开始你的流,那么将它包装到from 函数中:

import from from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);

【讨论】:

【参考方案8】:

对于数据流(相当于then):

Rx.Observable.fromPromise(...)
  .flatMap(function(result) 
   // do something
  )
  .flatMap(function(result) 
   // do something
  )
  .subscribe(function onNext(result) 
    // end of chain
  , function onError(error) 
    // process the error
  );

可以使用Rx.Observable.fromPromise 将promise 转换为observable。

一些 promise 操作符有直接翻译。比如RSVP.all,或者jQuery.when可以替换成Rx.Observable.forkJoin

请记住,您有一堆运算符允许异步转换数据,并执行您不能或很难用 Promise 完成的任务。 Rxjs 通过异步数据序列(序列即多于 1 个异步值)展示了它的所有功能。

对于错误管理,主题稍微复杂一些。

还有catch 和finally 运算符 retryWhen 还可以帮助在出现错误时重复序列 您还可以使用onError 函数处理订阅者本身的错误。

要获得精确的语义,请更深入地查看您可以在网络上找到的文档和示例,或者在此处提出具体问题。

这绝对是深入使用 Rxjs 进行错误管理的一个很好的起点:https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

【讨论】:

我总是看到可观察的序列以 subscribe() 结尾。由于这只是可观察对象的功能,有什么理由这样做吗?是启动序列的功能吗? 正是如此。如果没有观察者通过订阅,您的 observable 将不会发出任何数据,因此您不会看到任何数据流。 我建议你看看这个:gist.github.com/staltz/868e7e9bc2a7b8c1f754。它可能比官方文档更可口。 Promise.then.flatMap 而不是 .map 仅供参考,这与第三个then 中的Promise 版本中的错误不完全相同,catch 将捕获该错误。他们不在。

以上是关于RxJS 序列等价于 promise.then()?的主要内容,如果未能解决你的问题,请参考以下文章

promise和Rxjs的一点区别

扁平化 javascript 中的 Promise

Promise和Observable的映射

如何在 Promise.then 中访问范围外的变量(类似于闭包)

rxjs简单入门

rxjs简单入门