Flutter:将两个 Streams 流式传输到一个屏幕中?

Posted

技术标签:

【中文标题】Flutter:将两个 Streams 流式传输到一个屏幕中?【英文标题】:Flutter : stream two Streams into a single screen? 【发布时间】:2019-01-23 14:34:51 【问题描述】:

我有两个流从两个不同的 api 获取。

Stream<Month> get monthOutStream => monthOutController.stream;
Stream<MySchedule> get resultOutStream => resultController.stream;

我在应用程序的两种不同状态下获取这些数据,开始时的结果和来自用户的某些事件后的几个月。

MyScheduleBloc()
  initialData();


Future initialData() async 
  MySchedule mySchedule = await myScheduleViewModel.importMySchedule(now.id);
  resultController.add(mySchedule);

我的屏幕有一个流构建器

Widget build(BuildContext context) 
final webCalenderBloc = WebCalenderBloc();
return StreamBuilder(
  stream: webCalenderBloc.resultOutStream,
  builder: (BuildContext context , snapdata)
    if(!snapdata.hasData)
      return Center(
        child: CircularProgressIndicator(),
      );
    
    return body(snapdata.data);
   ,
 );

由于主要的小部件构建方法将带有 resultoutstream 的 StreamBuilder 小部件作为流。我在哪里获取另一个流monthoutStream。 我可以在流中获取流吗?在处理两个流时我是否遗漏了任何东西。我不想从monthoutstream构建任何小部件,但想检查其中的数据。

【问题讨论】:

【参考方案1】:

如果需要,您可以嵌套StreamBuilder。没有什么能阻止您执行以下操作:

StreamBuilder(
  stream: stream1,
  builder: (context, snapshot1) 
    return StreamBuilder(
      stream: stream2,
      builder: (context, snapshot2) 
        // do some stuff with both streams here
      ,
    );
  ,
)

如果这对您有意义,另一种解决方案是:流被设计为可合并/转换。您可以制作第三个流,它是后面两个流的合并。

对于复杂的流操作,您最好使用rxdart,因为它提供了一些有用的转换器。

使用 rxdart,两个ObservableStream 的子类)的融合如下:

Observable<bool> stream1;
Observable<String> stream2;

final fusion = stream1.withLatestFrom(stream2, (foo, bar) 
  return MyClass(foo: foo, bar: bar);
);

【讨论】:

感谢您提供有关合并两个流或一个接一个使用的信息。在某些时候,我会遇到第一个流具有数据“真”的情况,然后它必须弹出导航,如果“假”然后转到另一个流。问题出现在小部件正在构建并且必须弹出导航虽然弹出工作但有几秒钟存在错误说小部件正在构建返回一个空值。如何在一个接一个地使用流时解决这个问题。 pop out the Navigation 应该在监听器中,而不是在 streamBuilder 中。【参考方案2】:
Observable.combineLatest2(
        aStream,
        bStream,
        (a, b, c) =>
        a != '' && b != '');

combineLatestN 返回一个组合流

【讨论】:

我试过这个,但没有用。 StreamBuilder 正在返回错误消息,并显示“状态不佳:Stream 已被收听。” @NihadDelic 您可能在接收数据后关闭了流,或者您没有使用 streamController.broadcast() 。【参考方案3】:

我正在使用一种 BLoC,我在其中广播了 Stream&lt;Null&gt;s,它只会在某些事情发生变化时通知听众。有点像 Qt 的信号和插槽。无论如何,对于我想听不止一个流的情况,我做了这门课。它基本上是StreamBuilder,但您可以收听多个流,它会丢弃流中的所有数据。

import 'dart:async';

import 'package:flutter/widgets.dart';

typedef MultiStreamWidgetBuilder<T> = Widget Function(BuildContext context);

// A widget that basically re-calls its builder whenever any of the streams
// has an event.
class MultiStreamBuilder extends StatefulWidget 
  const MultiStreamBuilder(
    required this.streams,
    required this.builder,
    Key? key,
  ) : super(key: key);

  final List<Stream<dynamic>> streams;
  final MultiStreamWidgetBuilder builder;

  Widget build(BuildContext context) => builder(context);

  @override
  State<MultiStreamBuilder> createState() => _MultiStreamBuilderState();


class _MultiStreamBuilderState extends State<MultiStreamBuilder> 
  final List<StreamSubscription<dynamic>> _subscriptions = [];

  @override
  void initState() 
    super.initState();
    _subscribe();
  

  @override
  void didUpdateWidget(MultiStreamBuilder oldWidget) 
    super.didUpdateWidget(oldWidget);
    if (oldWidget.streams != widget.streams) 
      // Unsubscribe from all the removed streams and subscribe to all the added ones.
      // Just unsubscribe all and then resubscribe. In theory we could only
      // unsubscribe from the removed streams and subscribe from the added streams
      // but then we'd have to keep the set of streams we're subscribed to too.
      // This should happen infrequently enough that I don't think it matters.
      _unsubscribe();
      _subscribe();
    
  

  @override
  Widget build(BuildContext context) => widget.build(context);

  @override
  void dispose() 
    _unsubscribe();
    super.dispose();
  

  void _subscribe() 
    for (final s in widget.streams) 
      final subscription = s.listen(
        (dynamic data) 
          setState(() );
        ,
        onError: (Object error, StackTrace stackTrace) 
          setState(() );
        ,
        onDone: () 
          setState(() );
        ,
      );
      _subscriptions.add(subscription);
    
  

  void _unsubscribe() 
    for (final s in _subscriptions) 
      s.cancel();
    
    _subscriptions.clear();
  

使用示例:

class AppWidget extends StatelessWidget 
  @override
  Widget build(BuildContext context) 
    return MultiStreamBuilder(
      streams: [appState.repoListChanged, appState.selectedRepoChanged],
      builder: _buildMain,
    );
  

  Widget _buildMain(BuildContext context) 
    return Scaffold(
      body: Row(
...

我只是写了它,所以我没有太多测试它。我认为理论上你可以制作一个给你状态的,虽然我不确定 Dart 的类型系统是否足够先进,让你不用到处求助于dynamic 就可以做到。

【讨论】:

【参考方案4】:

在我的情况下,我倾向于将多个流合并为一个流,如果它们来自同一类型,那么您可以使用:

import 'package:async/async.dart' show StreamGroup;
...
StreamGroup.merge([stream1,stream2]);

【讨论】:

根据StreamGroup API merge&lt;T&gt;(Iterable&lt;Stream&lt;T&gt;&gt; streams) → Stream&lt;T&gt;,这仅适用于您要合并的所有流具有相同类型T。因此,你不能合并两个完全不同类型的流,例如Stream&lt;A&gt;Stream&lt;B&gt; 我就是这么说的【参考方案5】:

multiple_stream_builder 包:

Widget build(BuildContext context) 
  return StreamBuilder3<int, int, int>(
    streams: Tuple3(stream1, stream2, stream3),
    initialData: Tuple3(0, 0, 0),
    builder: (context, snapshots) 
      return Text(
        'stream1: $snapshots.item1.data - stream2: $snapshots.item2.data - stream3: $snapshots.item3.data',
      );
    ,
  );

multi_stream_builder 包:

Widget build(BuildContext context) 
  return MultiStreamBuilder(
    streams: [stream1, stream2],
    builder: (context, dataList) 
      final stream1Value = dataList[0];
      final stream2Value = dataList[1];
      return Text('$stream1Value, $stream2Value');
    ,
  );

【讨论】:

以上是关于Flutter:将两个 Streams 流式传输到一个屏幕中?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams 表转换

Flutter Web 录制音频以流式传输

如何从 Java 8 Streams 中获取 Inputstream?

Flutter WebRTC 从服务器流式传输视频

如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用

流式传输时 Flutter camera 0.8.1 闪烁