Flutter 异步编程 - 捌 | 计算耗时? Isolate 来帮忙

Posted 嘴巴吃糖了

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flutter 异步编程 - 捌 | 计算耗时? Isolate 来帮忙相关的知识,希望对你有一定的参考价值。

一、问题引入 - 计算密集型任务

假如现在有个需求,我想要计算 1 亿1~10000 间随机数的平均值,在界面上显示结果,该怎么办?
可能有小伙伴踊跃发言:这还不简单,生成 1 亿 个随机数,算呗。


1. 搭建测试场景

如下,写个简单的测试界面,界面中有计算结果和耗时的信息。点击运行按钮,触发 _doTask 方法进行运算。计算完后将结果展示出来:

代码详见: 【async/isolate/01】

void _doTask() 
  int sum = 0;
  int startTime = DateTime.now().millisecondsSinceEpoch;
  for(int i = 0;i<count;i++)
    sum += random.nextInt(10000);
  
  int endTime = DateTime.now().millisecondsSinceEpoch;
  result = sum/count;
  cost =  endTime - startTime;
  setState(() );


可以看到,这样是可以实现需求的,总耗时在 8.5 秒左右。细心的朋友可能会发现,在点击按键触发 _doTask 时,FloatingActionButton 的水波纹并没有出现,仿佛是卡死一般。为了应证这点,我们再进行一个对比实验。

请点击前请点击后

2. 计算耗时阻塞

如下所示,我们让 CupertinoActivityIndicator 一直处于运动状态,作为界面 未被卡死 的标志。当点击运行时,可以看出指示器被卡住了, 再点击按钮也没有任何的水波纹反映,这说明:

计算的耗时任务会阻塞 Dart 的线程,界面因此无法有任何响应。

未执行前执行前后
Row(
  mainAxisAlignment: MainAxisAlignment.center,
  children: const [Text("动画指示器示意: "), CupertinoActivityIndicator()],
),

3. 计算耗时阻塞的解决方案

有人说,用异步的方式触发 _doTask 呗,比如用 FuturescheduleMicrotask 包一下,或 Stream 异步处理。有这个想法的人可以试一试,如果你看懂前面几篇看到了原理,就知道是不可行的,这些工具只不过是回调包装而已。只要计算的任务仍是 Dart 在单线程中处理的,就无法避免阻塞。现在的问题相当于:

一个人无法同时做 洗漱扫地 的任务。

一旦阻塞,界面就无法有任何响应,自然也无法展示加载中的动画,这对于用户体验来说是极其糟糕的。那如何让计算密集型的耗时任务,在处理时不阻塞呢? 我们可以好好品味一下这句话:

这句话言外之意给出了两种解决方案:

【1】. 将计算密集型的耗时任务,从 Dart 端剥离,交由 其他机体 来处理。
【2】. 在 Dart 中通过 多线程 的方式处理,从而不阻塞主线程。

方式一其实很好理解,比如耗时的任务交由服务端来完成,客户端通过 接口请求 ,获取响应结果。这样计算型的密集任务,对于 Flutter 而言,就转换成了一个网络的 IO 任务。或者通过 插件 的方式,将计算的耗时任务交由平台来通过多线程处理,而 Dart 端只需要通过回调处理即可,也不会阻塞。

方式一处理的本质上都是将计算密集型的任务转移到其他机体中,从而让 Dart 避免处理计算密集型的耗时任务。这种方式需要其他语言或后端的支持,想要实现是有一定门槛的。那如何直接在 Flutter 中,通过 Dart 语言处理计算密集型的任务呢?

这就是我们今天的主角: Isolate 。 可能很多人潜意识里 Dart 是单线程模型,无法通过多线程的处理任务,这种认知就狭隘了。其实 Dart 提供了 Isolate, 本质上是通过 C++ 创建线程,隔离出另一份区间来通过 Dart 处理任务。它相当于线程的一种上层封装,屏蔽了很多内部细节,可以通过 Dart 语言直接操作。


二、从 compute 函数认识 Isolate

首先,我们通过 compute 函数认识一下计算密集型的耗时任务该如何处理。 compute 函数字如其名,用于处理计算。只要简单看一下,就知道它本身是 Isolate 的一个简单的封装使用方式。它作为全局函数被定义在 foundation/isolates.dart 中:


1. 认识 compute 函数

既然是函数,那使用时就非常简单,调用就行了。关于函数的调用,比较重要的是 入参返回值泛型。从上面函数定义中可以看出,它就是 isolate 包中的 compute 函数, 其中泛型有两个 QR ,返回值是 R 泛型的 Future 对象,很明显该泛型表示结果 Result;第二入参是 Q 泛型的 message ,表示消息类型;第三入参是可选参数,用于调试时的标签。

---->[_isolates_io.dart#compute]----
/// The dart:io implementation of [isolate.compute].
Future<R> compute<Q, R>(
    isolates.ComputeCallback<Q, R> callback, 
    Q message, 
     String? debugLabel ) 
async 

看到这里,很自然地就可以想到,这里第一参中传入的 callback 就是计算任务,它将被在其他的 isolate 中被执行,然后返回计算结果。下面我们来看一下在当前场景下的使用方式。在此之前,先封装一下返回的结果。通过 TaskResult 记录结果,作为 compute 的返回值:

代码详见: 【async/isolate/02_compute】

class TaskResult 
  final int cost;
  final double result;

  TaskResult(required this.cost, required this.result);


2. compute 函数的使用

compute 方法在传入两个参数,其一是 _doTaskInCompute ,也就是计算的耗时任务,其二是传递的信息,这里不需要,传空值字符串。虽然方法的泛型可以不传,但严谨一些的话,可也以把泛型加上,这样可读性更好一些:

void _doTask() async 
  TaskResult taskResult = await compute<String, TaskResult>(
      _doTaskInCompute, '',
      debugLabel: "task1");
  setState(() 
    result = taskResult.result;
    cost = taskResult.cost;
  );


对于 compute 而言,传入的回调有一个非常重要的注意点:

函数必须是 静态函数 或者 全局函数

static Random random = Random();

static Future<TaskResult> _doTaskInCompute(String arg) async 
  int count = 100000000;
  double result = 0;
  int cost = 0;
  int sum = 0;
  int startTime = DateTime.now().millisecondsSinceEpoch;
  for (int i = 0; i < count; i++) 
    sum += random.nextInt(10000);
  
  int endTime = DateTime.now().millisecondsSinceEpoch;
  result = sum / count;
  cost = endTime - startTime;
  return TaskResult(
    result: result,
    cost: cost,
  );


下面看一下用和不用 compute 处理的效果差异,如下左图是使用 compute 的效果,在进行计算的同时指示器的动画仍在运动,桌面计算操作并未影响主线程,界面仍可以触发响应,这就和前面产生了鲜明的对比。

用 compute不用 compute

3. 理解 compute 的作用

如下,在 _doTaskInCompute 中打断点调试一下,可以看出此时除了 main 还有一个 task1 的栈帧。此时断点停留在新帧中, main 仍处于运行状态:

这就相当于计算任务不想自己处理,找另外一个人来做。每块处理任务的单元,就可以视为一个 isolate。它们之间的信息数据在内存中是不互通的,这也是为什么起名为 隔离 isolate 的原因。 这种特性能非常有效地避免多线程中操作同一内存数据的风险。 但同时也需要引入一个 通信机制 来处理两个 isolate 间的通信。

其实这和 客户端 - 服务端 的模型非常相似,通过 发送端 SendPort 发送消息,通过接收端 RawReceivePort 接收消息。从 compute 方法的源码中可以简单地看出,其本质是通过 Isolate.spawn 实现的 Isolate 创建。

这里有个小细节要注意,通过多次测试发现 compute 中的计算耗时要普遍高于主线程中的耗时。这并不是说新建的 isolate 在计算能力上远小于 主 isolate, 毕竟这里是 1 亿 次的计算,任何微小的细节都将被放大 1 亿 倍。这里的关注点应在于 新 isolate 可以独立于 主 isolate 运行,并且可以通过通信机制将结果返回给 主 isolate


4. compute 参数传递与多个 isolate

如果是大量的相互独立的计算耗时任务,可以开启多个 isolate 共同处理,最后进行结果汇总。比如这里 1 亿 次的计算,我们可以开 2isolate , 分别处理 5000 万 个计算任务。如下所示,总耗时就是 6 秒左右。当然创建 isolate 也是有资源消耗的,并不是说创建 100 个就能把耗时降低 100 倍。

关于传参非常简单,compute 第一泛型是参数类型,这里可以指定 int类型作为 _doTaskInCompute 任务的入参,指定计算的次数。这里通过两个 compute 创建两个 isolate 同时处理 5000 万 个随机数的的平均值,来模拟那些相互独立的任务:

代码详见: 【async/isolate/03_compute】

最后通过 Future.await 对多个异步任务进行结果汇总,示意图如下,这样就相当于又开了一个 isolate 进行处理计算任务:

对于 isolate 千万不要盲目使用,一定要认清当前任务是否真有必要使用。比如几百微秒就能处理完成的任务,用 isolate 就是拿导弹打蚊子。或者那些并非由 Dart 端处理的 IO 密集型 任务,用 isolate 就相当于你打开了烧水按钮,又找来一个人专门看着烧水的过程。这种多此一举的行为,都是对于异步不理解的表现。

一般而言,客户端中并没有太多需要处理复杂计算的场景,只有一些特定场景的软件,比如需要进行大量的文字解析、复杂的图片处理等。


三、分析 compute 函数的源码实现

到这可能有人觉得,新开一个 isolate好简单啊,compute 函数处理一下就好啦。但是,简单必然有简单的 局限性,仔细思考一下,会发现 compute 函数有个缺陷:它只会 "闷头干活",只有任务完成才会通过 Future 通知 main isolate

也就是说,对于 UI 界面来说无法无法感知到 任务执行进度 信息,处理展示 计算中... 之外没什么能干的。这在某些特别耗时的场景中会造成用户的等待焦虑,我们需要让干活的 isolate 抽空通知一下 main isolate,所以对 isolate 之间的通信方式,是有必要了解的。

既然 compute 在完成任务时可以进行一次通信,那么就可以从 compute 函数的源码中去分析这种通信的方式。


1. 接收端口的创建与处理器设置

如下所示,在一开始会创建一个 Flow 对象,从该对象的成员中可以看出,它只负责维护两个整型 id_type 的数值信息。接下来会创建 RawReceivePort 对象,是不是有点眼熟?


还记得那个经常在面前晃的 _RawRecivePortImpl类吗? RawReceivePort 的默认工厂构造方法创建的就是 _RawReceivePortImpl 对象,如下代码所示:

---->[isolate_patch.dart/RawReceivePort]----
@patch
class RawReceivePort 
  @patch
  factory RawReceivePort([Function? handler, String debugName = '']) 
    _RawReceivePortImpl result = new _RawReceivePortImpl(debugName);
    result.handler = handler;
    return result;
  


接下来,会创建一个 Completer 对象,并在为 port 设置信息的 handler 处理器,在处理回调中触发 completer#complete 方法,表示异步任务完成。也就是说处理器接收信息之时,就是 completer 中异步任务完成之日。

如果不知道 Completer 和接收端口设置 handler 是干嘛的,可以分别到 【第五篇·第二节】【第六篇·第一节】 温故,这里就不赘述了。

---->[_isolates_io.dart#compute]----
final Completer<dynamic> completer = Completer<dynamic>();
port.handler = (dynamic msg) 
  timeEndAndCleanup();
  completer.complete(msg);
;

2. 认识 Isolate.spawn 方法

接下来会触发 Isolate.spawn 方法,该方法是生成 isolate 的核心。其中传入的 回调 callback消息 message 以及发送的端口 SendPort 会组合成 _IsolateConfiguration 作为第二参数:

通过 Isolate.spawn 方法的定义可以看出,第一参是一个入口函数,第二参是函数入参。所以上面红框中的对象将作为 _spawn 函数的入参。从这里可以看出第一参 _spawn 函数应该是在新 isolate 中执行的。

external static Future<Isolate> spawn<T>(
    void entryPoint(T message), T message,
    bool paused = false,
    bool errorsAreFatal = true,
    SendPort? onExit,
    SendPort? onError,
    @Since("2.3") String? debugName);

下面是在耗时任务中打断点的效果,其中很清晰地展现出 _spawn 方法到 _doTaskInCompute 的过程。

如下,是 _spawn 的处理流程,上面的调试发生在 127 行,此时触发回调方法,获取结果。然后在关闭 isolate 时,将结果发送出去,流程其实并不复杂。

有一个小细节,结果通过 _buildSuccessResponse 方法处理了一下,关闭时发送的消息是列表,后期会根据列表的长度判断任务处理的正确性。

List<R> _buildSuccessResponse<R>(R result) 
  return List<R>.filled(1, result);


3. 异步任务的结束

从前面测试中可以知道 compute 函数返回值是一个泛型为结果的 Future 对象,那这个返回值是什么呢?如下可以看出当结果列表长度为 1 表示任务成功完成,返回 completer 任务结果的首元素:

再结合 completer 触发 complete 完成的时机,就不难知道。最终的结果是由接收端接收到的信息,调试如下:

也就是说,isolate 关闭时发送的信息,将会被 接收端的处理器 监听到。这就是 compute 函数源码的全部处理逻辑,总的来看还是非常简单的。就是,使用 Completer ,基于 Isolate.spawn 的简单封装,屏蔽了用户对 RawReceivePort 的感知,从而简化使用。


四、Isolate 发送和接收消息的使用

通过 compute 函数我们知道 isoalte 之间有着一套消息 发送 - 监听 的机制。我们可以利用这个机制在某些时刻发送进度消息传给 main isolate,这样 UI 界面中就可以展示出 耗时任务 的进度。如下所示,每当 100 万次 计算时,发送消息通知 main isolate :


1. 使用 Isolate.spawn

compute 函数为了简化使用,将 发送 - 监听 的处理封装在了内部,用户无法操作。使用为了能使用该功能,我们可以主动来使用 Isolate.spawn 。如下所示,创建 RawReceivePort,并设置 handler 处理器器,这里通过 handleMessage 函数来单独处理。

代码详见: 【async/isolate/04_spawn】

然后调用 Isolate.spawn 来开启新 isolate,其中第一参是在新 isolate 中处理的耗时任务,第二参是任务的入参。这里将发送端口传入 _doTaskInCompute 方法,以便发送消息:

void _doTask() async 
  final receivePort = RawReceivePort();
  receivePort.handler = handleMessage;
  await Isolate.spawn(
    _doTaskInCompute,
    receivePort.sendPort,
    onError: receivePort.sendPort,
    onExit: receivePort.sendPort,
  );


2. 通过端口发送消息

SendPort 传入 _doTaskInCompute 中,如下 tag1 处,可以每隔 1000000 次发送一次进度通知。在任务完成后,使用 Isolate.exit 方法关闭当前 isolate 并发送结果数据。

static void _doTaskInCompute(SendPort port) async 
  int count = 100000000;
  double result = 0;
  int cost = 0;
  int sum = 0;
  int startTime = DateTime.now().millisecondsSinceEpoch;
  for (int i = 0; i < count; i++) 
    sum += random.nextInt(10000);
    if (i % 1000000 == 0)  // tag1
      port.send(i / count);
    
  
  int endTime = DateTime.now().millisecondsSinceEpoch;
  result = sum / count;
  cost = endTime - startTime;
  Isolate.exit(port, TaskResult(result: result, cost: cost));


3. 通过接收端处理消息

接下来只要在 handleMessage 方法中处理发送端传递的消息即可,可以根据消息的类型判断是什么消息,比如这里如果是 double 表示是进度,通知 UI 更新进度值。另外,如果不同类型的消息非常多,也可以自己定义一套发送结果的规范方便处理。

void handleMessage(dynamic msg) 
  print("=========$msg===============");
  if (msg is TaskResult) 
    progress = 1;
    setState(() 
      result = msg.result;
      cost = msg.cost;
    );
  
  if (msg is double) 
    setState(() 
      progress = msg;
    );
  


其实学会了如何通过 Isolate.spawn 处理计算耗时任务,以及通过 SendPort-RawReceivePort 处理 发送 - 监听 消息,就能满足绝大多数对 Isolate 的使用场景。如果不需要在任务执行过程中发送通知,使用 compute 函数会方便一些。最后还是要强调一点,不要滥用 Isolate ,使用前动动脑子,思考一下是否真的是计算耗时任务,是否真的需要在 Dart 端来完成。开一个 isolate 至少要消耗 30 kb

作者:张风捷特烈
链接:https://juejin.cn/post/7163431846783483912

最后

如果想要成为架构师或想突破20~30K薪资范畴,那就不要局限在编码,业务,要会选型、扩展,提升编程思维。此外,良好的职业规划也很重要,学习的习惯很重要,但是最重要的还是要能持之以恒,任何不能坚持落实的计划都是空谈。

如果你没有方向,这里给大家分享一套由阿里高级架构师编写的《android八大模块进阶笔记》,帮大家将杂乱、零散、碎片化的知识进行体系化的整理,让大家系统而高效地掌握Android开发的各个知识点。

相对于我们平时看的碎片化内容,这份笔记的知识点更系统化,更容易理解和记忆,是严格按照知识体系编排的。

全套视频资料:

一、面试合集

二、源码解析合集


三、开源框架合集


欢迎大家一键三连支持,若需要文中资料,直接点击文末CSDN官方认证微信卡片免费领取↓↓↓

以上是关于Flutter 异步编程 - 捌 | 计算耗时? Isolate 来帮忙的主要内容,如果未能解决你的问题,请参考以下文章

flutter 单线程异步 及 isolate 使用过程遇到的问题

flutter 单线程异步 及 isolate 使用过程遇到的问题

究竟什么是异步编程?

Flutter学习-Dart异步

究竟什么是异步编程?

异步编程利器:CompletableFuture详解