从(服务器发送的)EventSource 创建一个 RxJS Observable

Posted

技术标签:

【中文标题】从(服务器发送的)EventSource 创建一个 RxJS Observable【英文标题】:Creating an RxJS Observable from a (server sent) EventSource 【发布时间】:2016-04-24 18:36:25 【问题描述】:

我想从 EventSource(服务器发送的事件)创建一个 RxJs Observable。

我尝试了以下方法:

import Component, OnInit from 'angular2/core';
import Subject, Observable  from 'rxjs/Rx';

@Component(
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string:  s 
        </li>
    </ul>
    `
)
export class AppComponent implements OnInit 

    someStrings:string[] = [];

    ngOnInit() 
        let eventSource = new EventSource('/interval-sse-observable');
        let observable = Observable.create(eventSource);
        observable.subscribe(
            next: aString => this.someStrings.push(aString.data),
            error: err => console.error('something wrong occurred: ' + err)
        );
    

但我得到以下异常:

EXCEPTION: Error: Uncaught (in promise): EXCEPTION: TypeError: this._subscribe is not a function in [null]
ORIGINAL EXCEPTION: TypeError: this._subscribe is not a function
ORIGINAL STACKTRACE:
TypeError: this._subscribe is not a function
    at Observable.subscribe (https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js:11210:29)
    at AppComponent.ngOnInit (http://localhost:8080/scripts/app.component.ts!transpiled:30:28)
    at AbstractChangeDetector.ChangeDetector_HostAppComponent_0.detectChangesInRecordsInternal (viewFactory_HostAppComponent:21:99)
    at AbstractChangeDetector.detectChangesInRecords (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9689:14)
    at AbstractChangeDetector.runDetectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9672:12)
    at AbstractChangeDetector.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9661:12)
    at ChangeDetectorRef_.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:5280:16)
    at https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13048:27
    at Array.forEach (native)
    at ApplicationRef_.tick (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13047:34)

为了完整起见,这里是我的 index.html 的内容:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>sse demo</title>
    <!-- 1. Load libraries -->
    <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/angular2-polyfills.js"></script>
    <script src="https://code.angularjs.org/tools/system.js"></script>
    <script src="https://code.angularjs.org/tools/typescript.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js"></script>
    <script src="https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/http.dev.js"></script>

    <!-- 2. Configure SystemJS -->
    <script>
        System.config(
            transpiler: 'typescript',
            typescriptOptions:  emitDecoratorMetadata: true 
        );
        System.import('./scripts/app.ts')
                .then(null, console.error.bind(console));
    </script>
</head>
<body>

<my-app>Loading...</my-app>

</body>
</html>

有人可以帮忙吗?

edit 1:按照 Yurzui 的建议,我将代码修改如下:

ngOnInit() 
    let observable = Observable.create(observer => 
        const eventSource = new EventSource('/interval-sse-observable');
        eventSource.onmessage = x => observer.next(console.log(x));
        eventSource.onerror = x => observer.error(console.log('EventSource failed'));

        return () => 
            eventSource.close();
        ;
    );
    observable.subscribe(
        next: aString => this.someStrings.push(aString.data),
        error: err => console.error('something wrong occurred: ' + err)
    );

它确实在控制台中记录第一条消息,如下所示:

MessageEvent isTrusted: true, data: "c374a15b-b37d-498e-8ab0-49643b79c1bb", origin: "http://localhost:8080", lastEventId: "", source: null…bubbles: falsecancelBubble: falsecancelable: falsecurrentTarget: EventSourcedata: "c374a15b-b37d-498e-8ab0-49643b79c1bb"defaultPrevented: falseeventPhase: 0isTrusted: trueisTrusted: truelastEventId: ""origin: "http://localhost:8080"path: Array[0]ports: nullreturnValue: truesource: nullsrcElement: EventSourcetarget: EventSourcetimeStamp: 6257.125type: "message"__proto__: MessageEvent
Rx.js:10982 Uncaught TypeError: Cannot read property 'data' of undefinedSystem.register.exports_1.execute.AppComponent.ngOnInit.observable.subscribe.next @ app.component.ts:29SafeSubscriber.__tryOrUnsub @ Rx.js:10979SafeSubscriber.next @ Rx.js:10934Subscriber._next @ Rx.js:10894Subscriber.next @ Rx.js:10871System.register.exports_1.execute.AppComponent.ngOnInit.Rx_1.Observable.create.eventSource.onmessage @ app.component.ts:21

现在,如果不是在控制台中记录 x 变量,我只需将其传递给下一个方法,如下所示:

eventSource.onmessage = x => observer.next(x);

客户端检索服务器发送的事件(我在 chrome 开发工具中看到它们)但模板中没有显示任何内容,表明未填充字符串数组...

顺便说一句,我不得不删除 JSON.parse(x.data),因为它会导致错误。

【问题讨论】:

github.com/ReactiveX/rxjs/issues/1644(由你?) 【参考方案1】:

您可以使用以下代码为 EventSource 流手动创建 Observable:

export class AppComponent implements OnInit 
  someStrings:string[] = [];

  constructor(private zone: NgZone) 

  ngOnInit()
    const observable = Observable.create(observer => 
      const eventSource = new EventSource('/interval-sse-observable');
      eventSource.onmessage = x => observer.next(x.data);
      eventSource.onerror = x => observer.error(x);

      return () => 
        eventSource.close();
      ;
    );

    this.subscription = observable.subscribe(
      next: guid => 
        this.zone.run(() => this.someStrings.push(guid));
      ,
      error: err => console.error('something wrong occurred: ' + err)
    );
  


// somewhere
// this.subscription.unsubscribe()

别忘了导入 NgZone 类:

import Component, OnInit, NgZone from '@angular/core';

另见Angular2 View Not Changing After Data Is Updated

【讨论】:

谢谢@yurzui,我已经编辑了我的帖子以考虑到您的回复。 奇怪,我试过这个解决方案,它有效。你得到什么错误? 无错误无输出。你有一个工作的 plunker/fiddle 吗? 我添加了 plunker 示例 我会调查的。非常感谢Yurzui。仅供参考,可观察对象的创建将很快在 RxJs 5 中可用,请参阅:github.com/ReactiveX/rxjs/issues/1644【参考方案2】:

我应该完成 Yurzui 的回答:

就我而言,在使用 Angular 6 时,我在将函数分配给 onmessage 时遇到了一些奇怪的行为。因此,我添加了一个事件监听器,它就像一个魅力:

const observable = Observable.create(observer => 
  const eventSource = new EventSource('/interval-sse-observable');
  eventSource.addEventListener("message", (event: MessageEvent) => observer.next(event.data));
  eventSource.addEventListener("error", (event: MessageEvent) => observer.error(event));

  return () => 
    eventSource.close();
  ;
);

【讨论】:

以上是关于从(服务器发送的)EventSource 创建一个 RxJS Observable的主要内容,如果未能解决你的问题,请参考以下文章

EventSource / 服务器通过 Nginx 发送的事件

EventSource 中的 HTTP 授权标头(服务器发送事件)

可以通过 POST 使用 EventSource 传递参数的服务器发送事件 (SSE)

服务器推送的实现—基于EventSource

Eventsource golang:如何检测客户端断开连接?

HTTP2 - 如何拥有类似 WebSocket 的功能(Keep-Alive、EventSource 等)