Flutter-Stream使用
Posted 一叶飘舟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flutter-Stream使用相关的知识,希望对你有一定的参考价值。
前言
在 Flutter 中有两种处理异步操作的方式 Future
和 Stream
,Future
用于处理单个异步操作,Stream
用来处理连续的异步操作。比如往水杯倒水,将一个水杯倒满为一个 Future
,连续的将多个水杯倒满就是 Stream
。
Stream
就是事件流或者管道,事件流相信大家并不陌生,简单的说就是:基于事件流驱动设计代码,然后监听订阅事件,并针对事件变换处理响应。
Flutter 中,整个 Stream
设计外部暴露的对象主要如下图,主要包含了 StreamController
、Sink
、Stream
、StreamSubscription
四个对象。
有一个事件源叫 Stream
,为了方便控制 Stream
,官方提供了使用 StreamController
作为管理;同时它对外提供了 StreamSink
对象作为事件输入口,可通过 sink
属性访问; 又提供 stream
属性提供 Stream
对象的监听和变换,最后得到的 StreamSubscription
可以管理事件的订阅。
所以我们可以总结出:
- StreamController :如类名描述,用于整个
Stream
过程的控制,提供各类接口用于创建各种事件流。 - StreamSink:一般作为事件的入口,提供如
add
,addStream
等。 - Stream:事件源本身,一般可用于监听事件或者对事件进行转换,如
listen
、where
。 - StreamSubscription:事件订阅后的对象,表面上用于管理订阅过等各类操作,如
cacenl
、pause
,同时在内部也是事件的中转关键。
Stream分类
流可以分为两类:
- 单订阅流(Single Subscription),这种流最多只能有一个监听器(listener)
- 多订阅流(Broadcast),这种流可以有多个监听器监听(listener)
Stream创建与监听的方式
- periodic创建,listen监听
- streamController创建,listen或 StreamBuilder监听
Controller.sink.add添加事件
addError
Controller.close关闭水龙头
- 创建
StreamController
, - 然后获取
StreamSink
用做事件入口, - 获取
Stream
对象用于监听, - 并且通过监听得到
StreamSubscription
管理事件订阅,最后在不需要时关闭
- streamController.broadcast广播
可以在多处监听它返回的结果
Stream.asBroadcastStream() 可以将一个单订阅模式的 Stream 转换成一个多订阅模式的 Stream
- async*
在streamController传入使用
Stream常用操作符
-
map 包装转化
-
where 过滤
-
distinct去重
-
Stream.take(int count)
上面创建了一个无限每隔一秒发送一次事件的流,如果我们想指定只发送10个事件则,用take。下面就只会打印出0-9
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); stream = stream.take(10); //指定发送事件个数 await for(int i in stream ) print(i);
-
Stream.takeWhile
上面这种方式我们是只制定了发送事件的个数,如果我们也不知道发送多少个事件,我们可以从返回的结果上做一个返回值的限制,上面结果也可以用以下方式实现
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); // stream = stream.take(10); stream = stream.takeWhile((data) return data < 10; ); await for (int i in stream) print(i);
-
Stream.skip(int count)
skip可以指定跳过前面的几个事件,如下会跳过0和1,输出 2-9;
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); stream = stream.take(10); stream = stream.skip(2); await for (int i in stream) print(i);
-
Stream.skipWhile
可以指定跳过不发送事件的指定条件,如下跳过0-4的输出,输出5-9
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); stream = stream.take(10); stream = stream.skipWhile((data) => data<5); await for (int i in stream) print(i);
-
Stream.toList()
将流中所有的数据收集存放在List中,并返回 Future对象,listData里面 0-9
1.这个是一个异步方法,要结果则需要使用await关键字
2.这个是等待Stream当流结束时,一次返回结果
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); stream = stream.take(10); List<int> listData = await stream.toList(); for (int i in listData) print(i);
-
Stream. listen()
这是一种特定的可以用于监听数据流的方式,和 forEach循环的效果一致,但是返回的是
StreamSubscription<T>
对象,如下也会输出0-9,同时打印出 ”流已完成“看一下源码这种方式可以接收
StreamSubscription<T> listen(void onData(T event), Function onError, void onDone(), bool cancelOnError);
1.
onData
是接收到数据的处理,必须要实现的方法2.
onError
流发生错误时候的处理3.
onDone
流完成时候调取4.
cancelOnError
发生错误的时候是否立马终止void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); stream = stream.take(10); stream.listen((data) print(data); , onError: (error) print("流发生错误"); , onDone: () print("流已完成"); , cancelOnError: false);
-
Stream. forEach()
这中操作和
listen()
的方式基本差不多,也是一种监听流的方式,这只是监听了onData
,下面代码也会输出0-9void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); stream = stream.take(10); stream.forEach((data) print(data); );
-
Stream .length
用于获取等待流中所有事件发射完成之后统计事件的总数量,下面代码会输出 10
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream.periodic(interval, (data) => data); stream = stream.take(10); var allEvents = await stream.length; print(allEvents);
-
Stream.where
在流中添加筛选条件,过滤掉一些不想要的数据,满足条件返回true,不满足条件返回false,如下我们筛选出流中大于5小于10的数据
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream<int>.periodic(interval, (data) => data); stream = stream.where((data)=>data>5); stream = stream.where((data)=> data<10); await for(int i in stream) print(i);
-
stream.map
对流中的数据进行一些变换,以下是我对Stream的每个数据都加1
void _stream() async Duration interval = Duration(seconds: 1); Stream<int> stream = Stream<int>.periodic(interval, (data) => data); stream = stream.map((data) => data + 1); await for (int i in stream) print(i);
-
Stream.expand
对流中的数据进行一个扩展,如下,会输出1,1,2,2,3,3….
void _stream() async Duration interval = Duration(seconds: 1); Stream stream = Stream.periodic(interval, (data) => data); stream = stream.expand((data)=>[data,data]); stream.listen((data)=>print(data),onError:(error)=> print("发生错误") );
-
Stream.transform
如果我们在在流流转的过程中需要进行一些转换和控制我们则需要使用到transform,接收一个
StreamTransformer<S,T>,S表示转换之前的类型,T表示转换后的输入类型,如下代码我们会接收到三组数字模拟输入了三次密码,并判断真确的密码,同时输出密码正确和密码错误:
void _stream() async var stream = Stream<int>.fromIterable([123456,234567,678901]); var st = StreamTransformer<int, String>.fromHandlers( handleData: (int data, sink) if (data == 678901) sink.add("密码输入正确,正在开锁。。。"); else sink.add("密码输入错误..."); ); stream.transform(st).listen((String data) => print(data), onError: (error) => print("发生错误"));
输入如下结果
I/flutter (18980): 密码输入错误... I/flutter (18980): 密码输入错误... I/flutter (18980): 密码输入正确,正在开锁。。。
Stream 是怎么实现异步的?
这就需要说到 Dart 中的异步实现逻辑了,因为 Dart 是 单线程应用 ,和大多数单线程应用一样,Dart 是以 消息循环机制 来运行的,而这里面主要包含两个任务队列,一个是 microtask 内部队列,一个是 event 外部队列,而 microtask 的优先级又高于 event 。
默认的在 Dart 中,如 点击、滑动、IO、绘制事件 等事件都属于 event 外部队列,microtask 内部队列主要是由 Dart 内部产生,而 Stream
中的执行异步的模式就是 scheduleMicrotask
了。
因为 microtask 的优先级又高于 event ,所以如果 microtask 太多就可能会对触摸、绘制等外部事件造成阻塞卡顿哦。
如下图,就是 Stream 内部在执行异步操作过程执行流程:
以上是关于Flutter-Stream使用的主要内容,如果未能解决你的问题,请参考以下文章