Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组
Posted
技术标签:
【中文标题】Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组【英文标题】:Rxjs: Combine observables data streams(grpc service) into an array after X secs using scan or mergeMap or any rxjs 【发布时间】:2021-12-14 07:01:15 【问题描述】:我在 2 秒内收到一个数据流作为来自服务 (grpc) 的响应,例如 5 个流。每个流都是我订阅的可观察对象。一旦我收到每个流,处理逻辑就更多了,每个流都有一些沉重的 json 对象,其中包含类似 base64 编码的字节字符串,涉及 json 到复杂类型的转换和更多的条件逻辑。因此,当一个流的处理逻辑完成时,我无法验证其他流,因为它的处理速度非常快(像并发一样接近)并且被遗漏了。所以我需要在监听 X 秒后将所有这些 Y(例如 5)个流聚合到一个可观察的数组中。 下面是代码sn-p。
MyTestService.ts:
@Injectable()
export class MyTestService
client: GrpcMyTestServiceClient;
public readonly coreData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient)
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.coreData$ = this.listCoreStream();
listCoreStream(): Observable<TestReply.AsObject>
return new Observable(obs =>
const req = new SomeRequest();
//Get stream data from service(grpc)
const stream = this.client.getCoreUpdates(req);
stream.on('data', (message: any) =>
obs.next(message.toObject() as TestReply.AsObject);
);
);
MyComponent.ts
public testReply?: TestReply.AsObject;
private _subscription: Subscription;
constructor(private readonly _MyTestService: MyTestService)
this._subscription = new Subscription();
ngOnInit(): void
this._subscription = this._MyTestService.coreData$.subscribe((data) =>
if (data)
let obj = JSON.parse(data);
//processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
);
由于数据太快,跨度太短,没有全部记录或处理。它看起来像同步问题,这在网络世界中是不可能的,满足某些条件的最后一个流会覆盖前一个流。为了避免这种情况并一次处理所有流,我需要将所有流组合/合并到一个数组中,以便我可以在订阅 observable 的组件中迭代它们中的每一个。流数据排序无关紧要。
我尝试使用 rxjs 运算符,例如 timer
、mergemap
、concatmap
、scan
、merge
。但是这些仍然是新手,无法找到正确的方法。下面是使用scan
的一种尝试,但无法获得所需的结果,并且数组具有空值,并且不确定如何使用console.log
中的该数组。
任何指针都会很有帮助,请提出建议。
解决方案尝试:
let temp: TestReply.AsObject[] = [];
let test = this._MyTestService.coreData$
.pipe(
mergeMap(_ => timer(5000)),
scan<any>((allResponses, currentResponse) =>
[...allResponses, currentResponse], this.temp),
).subscribe(console.log);
【问题讨论】:
【参考方案1】:这是我的解决方案,使用订阅中的下一个块将所有数组推送到一起,然后最终执行完整块中的操作。
MyTestService.ts:
@Injectable()
export class MyTestService
client: GrpcMyTestServiceClient;
public readonly coreData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient)
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.coreData$ = this.listCoreStream();
listCoreStream(): Observable<TestReply.AsObject>
return new Observable(obs =>
const req = new SomeRequest();
//Get stream data from service(grpc)
const stream = this.client.getCoreUpdates(req);
stream.on('data', (message: any) =>
obs.next(message.toObject() as TestReply.AsObject);
);
);
stream.on('end', (message: any) =>
obs.complete();
);
);
我的组件.ts
public testReply?: TestReply.AsObject;
public dataArray = [];
private _subscription: Subscription;
constructor(private readonly _MyTestService: MyTestService)
this._subscription = new Subscription();
ngOnInit(): void
this._subscription = this._MyTestService.coreData$.subscribe(
next: (data) =>
if (data)
this.dataArray.push(JSON.parse(data));
,
complete: () =>
// this.dataArray
// above will be your final data
// processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
)
【讨论】:
感谢您的帮助,但由于我需要继续收听来自 grpc 服务的服务流,我无法在您添加的流端(在下面的代码中)使用 obs.complete()。 .I,如果网络断开或未处理的问题导致流结束,则执行 obs.error()。所以有了这个,如果我是正确的,我不会在我的组件订阅中点击完整的块。 stream.on('end', (message: any) => obs.error(); // 我这里没有 obs.complete() ); ); @mv_05 没问题,祝你好运!以上是关于Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组的主要内容,如果未能解决你的问题,请参考以下文章
[RxJS] Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through(代码片段