如何从使用 Futures/async/await 的流中正确生成?
Posted
技术标签:
【中文标题】如何从使用 Futures/async/await 的流中正确生成?【英文标题】:How can I correctly yield from a stream that uses Futures/async/await? 【发布时间】:2019-07-01 06:08:07 【问题描述】:我遇到了一个奇怪的问题,如果我在我的 Flutter 应用程序中从我的提供程序 yield*,函数中的其余代码不会完成。
我使用的是 BLoC 模式,所以我的 _mapEventToState 函数如下所示:
Stream<WizardState> _mapJoiningCongregationToState(
int identifier, int password) async*
_subscription?.cancel();
_subscription= (_provider.doThings(
id: identifier, password: password))
.listen((progress) =>
dispatch(Event(
progressMessage: progress.progressText))
, onError: (error)
print(error);
, onDone: ()
print('done joiining');
);
那么在提供者/服务中...这是第一次尝试。
final StreamController<Progress> _progressStream = StreamController<JoinCongregationProgress>();
@override
Stream<JoinCongregationProgress> doThings(
int id, int password) async*
await Future.delayed(Duration(seconds:2));
_progressStream.add(JoinCongregationProgress(progressText: "kake1..."));
await Future.delayed(Duration(seconds:2));
_progressStream.add(JoinCongregationProgress(progressText: "kake5!!!..."));
yield* _progressStream.stream;
yield 语句返回,但只有 两个等待的函数都完成了。这对我来说完全有意义,显然我不希望代码在等待“等待”完成之前以某种方式运行 yield*。
为了“订阅”该服务的进度,我需要将流返回给调用者,在 UI 上写入更新等。在我看来,这就像移动 yield* 一样简单到第一次等待之前。像这样。
final StreamController<Progress> _progressStream = StreamController<JoinCongregationProgress>();
@override
Stream<JoinCongregationProgress> doThings(
int id, int password) async*
yield* _progressStream.stream;
await Future.delayed(Duration(seconds:2));
_progressStream.add(JoinCongregationProgress(progressText: "kake1..."));
await Future.delayed(Duration(seconds:2));
_progressStream.add(JoinCongregationProgress(progressText: "kake5!!!..."));
但是,在后面的 _progressStream.add 调用上设置断点表明这些永远不会被调用。我被困在这个问题上,知道它可能是什么吗?我知道这与我如何混合 Futures 和 Streams 有关。
【问题讨论】:
【参考方案1】:yield*
等待它返回的流的完成。
在这种情况下,您希望立即返回一个流,然后将一些数据异步馈送到该流中。
有什么else 向流控制器添加事件吗?如果没有,您应该可以这样做:
@override
Stream<JoinCongregationProgress> doThings(int id, int password) async*
await Future.delayed(Duration(seconds:2));
yield JoinCongregationProgress(progressText: "kake1...");
await Future.delayed(Duration(seconds:2));
yield JoinCongregationProgress(progressText: "kake5!!!...");
不需要流控制器。
如果其他功能也添加到流控制器,那么您确实需要它。然后,您必须将流创建拆分为更新流控制器的异步部分和返回流的同步部分。也许:
final StreamController<Progress> _progressStream = StreamController<JoinCongregationProgress>();
@override
Stream<JoinCongregationProgress> doThings(int id, int password)
() async
await Future.delayed(Duration(seconds:2));
_progressStream.add(JoinCongregationProgress(progressText: "kake1..."));
await Future.delayed(Duration(seconds:2));
_progressStream.add(JoinCongregationProgress(progressText: "kake5!!!..."));
(); // Spin off async background task to update stream controller.
return _progressStream.stream;
【讨论】:
这看起来不错。我认为你的格式在帖子末尾有点不稳定......我现在试试这个,让你知道我的进展情况:D 谢谢。Wonkyness 已移除。以上是关于如何从使用 Futures/async/await 的流中正确生成?的主要内容,如果未能解决你的问题,请参考以下文章