Web 多线程开发利器 Comlink 的剖析与思考

Posted 奇舞精选

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Web 多线程开发利器 Comlink 的剖析与思考相关的知识,希望对你有一定的参考价值。

机制,我们在发起异步请求或定时等操作后,处理完地回调会放入任务队列,在执行栈空时,处理任务队列中的回调,因此不会阻塞主线程,参考下图:

Node、Deno 环境同样使用事件循环机制进行处理,不过在模型上存在差异。关于事件循环的具体细节本文不会细说,但核心思想在于:任务队列 + 异步回调

事实上,即使存在事件循环机制,某些任务依然会极大地占用主线程,例如近无限循环,会直接导致 CPU 占用 100%,此时后续的所有任务被阻塞,页面卡住,甚至失去响应,这在用户体验上是非常不友好的。但往往这样的任务不可避免,通常我们将其分为两类:

  • CPU 密集型:完成计算所需的时间主要受限于 CPU 的计算
  • I/O 密集型:完成计算所需的时间主要受限于输入/输出操作
  • 此时,多线程往往能起到关键性的作用,目前绝大多数现代计算机都拥有多核心,多线程处理能力,如果能物尽其用,必然是极好的。

    查看逻辑处理器内核数量

    函数,根据变量提升规则,只会有最后一个生效。那么想要执行不同的操作,除了新开一个工作线程外(失去意义),就只能在这个监听函数中通过 switchif 进行返回,这样违反了单一职责原则。

    呢?

    对于刚才提到的问题一,我们可以通过异步接口的形式返回想要的结果,得益于 ES6 中的 Promise 对象,通常我们对于异步的写法如下:

    onmessage 封装成一个函数,该函数返回一个 Promise,通过调用,进行“异步”操作?

    这当然是可以的。那么,这个函数必然在工作线程中,我们怎么去调用工作线程中的函数进行操作呢?

    RPC:Remote Procedure Call (https://en.wikipedia.org/wiki/Remote_procedure_call),远程过程调用,指调用不同于当前上下文环境的方法,通常可以是不同的线程、域、网络主机,通过提供的接口进行调用。

    通过 RPC 方式,我们可以达到想要的目的。这里就会介绍本文的主角 Comlink (https://github.com/GoogleChromeLabs/comlink)!

    没有条件,就要创造条件

    Comlink.expose(obj),它通过这种方式,将工作线程脚本中的上下文暴露给主线程环境中,下面通过查看部分核心代码来了解其具体的实现方式。

    函数的具体实现:

    返回了一个 Proxy 对象,并且代理了 getsetapplyconstruct 四种不同的操作。如 obj.counter 操作,又会返回一个新的 Proxy 对象。此处需要注意的是,await obj.counter,会访问 Proxy 对象上的 then 属性,因此会进入 if (prop === "then") 判断,执行 requestResponseMessage 函数:

    postMessage 呈现在眼前,所以当访问代理对象上的属性时,其实是发送了 GET 消息到工作线程,把真实值通过消息返回,形成看上去是本地调用的假象。
    再来看 expose 函数的具体实现:

    指向工作线程上下文环境,addEventListenerstart 开始发送和监听消息队列,本质和方法 onmessage (https://developer.mozilla.org/zh-CN/docs/Web/API/MessagePort#方法) 一致,这也印证了 wrap 的设想 —— 取工作线程上下文中对象的值,并通过消息返回。

    此处仅例举了 GET 操作,从 switch-case 结构和 Proxy 对象拦截的操作可以看出,不同的操作,会进行相应的处理,本文不一一详述。

    由此可见,Comlink 采用的 RPC 代理方式,并不是传递上下文环境,因为这是非常危险的,而且函数传递时会导致 Uncaught (in promise) DOMException: Failed to execute \'postMessage\' on \'Worker\': xxx could not be cloned. 报错。它本质上依然是 MessagePort 消息通讯,不过封装了我们所头疼的“操作判断”,并以一种更优雅的方式(Proxy + Promise)来处理。另外除了简单的调用和取值外,Comlink 还支持回调 (https://github.com/GoogleChromeLabs/comlink#callbacks) 和共享线程 (https://github.com/GoogleChromeLabs/comlink#sharedworker),感兴趣的可以自行了解。

    window.opener,理论上说,Comlink 的实现方式都可以适用于这些场景。

    回到最初,通过 switch-case 或条件判断来扩展函数逻辑,往往是我们能想到的第一种解决方案,因为违反了单一职责原则,被无情抛弃。但是如果这种方式能够进行一定程度内聚,往往会有出其不意的效果,这样的设计思维方式一样可以适用于其他领域。



    Web Workers RPC:Comlink 源码解析

    上篇文章,有提及 Web Workers RPC 以解决浏览器不阻塞UI的问题,其中 comlink 是一把利器,本文就 comlink 的关键源码进行解析。

    Comlink 通过提供 RPC 实现将基于 Worker.postMessage(someObject) 的 API 变成了对开发人员更友好的“类似本地调用”方式。

    拆解源码之前,先介绍几个重要的概念:Proxy、Channel Messaging API、Transferable objects

    注意:worker 创建完成后,每次通信都是新建 MessageChannel,目的是避免消息冲突。

    重要概念

    proxy

    new Proxy(target, handler)
    
    • target 被代理的对象
    • handler 被代理对象上的自定义行为
    handler 处理函数说明
    get劫持获取属性值
    set劫持设置属性值
    apply劫持函数调用
    construct劫持 new 操作符

    apply

    function sum(a, b) 
      return a + b
    
    
    const handler = 
      apply: function(target, thisArg, args) 
        return target(...args) * 10
      
    ;
    
    const proxy1 = new Proxy(sum, handler)
    
    console.log(sum(1, 2))			// 3
    console.log(proxy1(1, 2)) 	// 30
    

    construct

    class P 
      constructor (name) 
        this.name = name
      
      sayName () 
        console.log(this.name)
        return this.name
      
    
    
    const ProxyP = new Proxy(P, 
      construct (target, args) 
        return new target(...args)
      
    )
    new ProxyP('LiGang').sayName()	// LiGang
    

    Channel Messaging API

    Channel Messaging API 允许两个不同的脚本运行在同一个文档的不同浏览器上下文(比如两个 iframe,或者文档主体和一个 iframe,使用 SharedWorker 的两个文档,或者两个 worker)来直接通讯,在每端使用一个端口(port)通过双向频道(channel)向彼此传递消息。

    使用 MessageChannel() 构造函数来创建通讯信道。一旦创建,信道的两个端口即可通过 MessageChannel.port1MessageChannel.port2 属性进行访问(都会返回 MessagePort 对象)。

    MessageChannel 以 DOM Event 的形式发送消息,所以它属于异步的宏任务。

    示例:作为 EventEmitter 事件订阅发布使用,实现脚本间通信

    /* one.js */
    export default function (port)  port.onmessage/port.postMessage 
    /* two.js */
    export default function (port)  port.onmessage/port.postMessage 
    
    /* index.js */
    import one from 'one.js'
    import two from 'two.js'
    
    const  port1, port2  = new MessageChannel()
    one(port1)
    two(port2)
    

    通过 Channel Messaging 进行通信,也可以完成 worker 和 worker 直接通信,无需主进程。

    Transferable objects

    可转移对象是拥有可以从一个上下文转移到另一个上下文的资源的对象,确保资源一次只能在一个上下文中可用。转移后,原始对象不再可用;它不再指向传输的资源,任何读取或写入对象的尝试都将引发异常。

    可转移对象通常用于共享一次只能安全地暴露给单个 JavaScript 线程的资源。

    支持的对象:ArrayBuffer、MessagePort、ReadableStream、TransformStream、AudioData、ImageBitmap 等

    Channel Messaging API 的 MessageChannel 接口允许我们创建一个新的消息通道,并通过它的两个 MessagePort 属性(port1/port2)发送数据。

    myWorker.postMessage(aMessage, transferList)
    transferList(可选):一个可选的Transferable对象的数组,用于传递所有权。如果一个对象的所有权被转移,在发送它的上下文中将变为不可用(中止),并且只有在它被发送到的 worker 中可用。

    源码解析

    1. 通过 Proxy 对 wrap(worker) 劫持相关操作;
    2. 通过 ep(worker/MessageChannel)进行 on message 以及 postMessage 操作;
      • 基本类型:直接通过 worker 传递;
      • 非基本类型:需要通过 MessageChannel 传递port,进行 expose、wrap 处理
    3. 通过 toWireValue/fromWireValue 对通信原始数据处理
    wrap

    ① createProxy(Proxy:get/set/apply/construct):创建后,会生成相应 Proxy
    ② processArguments : Proxy(get/set/apply/construct) 劫持处理
    ③ toWireValue:对传入参数进行统一格式处理 [type,name,value, [transferables]]
    ④ serialize(MessageChannel)=> expose:创建 MessageChannel 通信管道、同时监听 worker 的返回(通过 expose,下面介绍)
    ⑤ requestResponseMessage(on message、postMessage):监听消息,同时发送当前信息

    // ①
    function createProxy<T>(
      ep: Endpoint,
      path: (string | number | symbol)[] = [],
      target: object = function () 
    ): Remote<T> 
    	return  new Proxy(target, 
        get() ,
        set() ,
        apply() ,
        construct() 
          const [argumentList, transferables] = processArguments(rawArgumentList);
          // ⑤
          return requestResponseMessage(...).then(fromWireValue)
        
          
    
    
    // ②
    function processArguments(argumentList: any[]): [WireValue[], Transferable[]] 
      const processed = argumentList.map(toWireValue);
      return [processed.map((v) => v[0]), myFlat(processed.map((v) => v[1]))];
    
    
    // ③    
    function toWireValue(value: any): [WireValue, Transferable[]] 
      for (const [name, handler] of transferHandlers) 
         const [serializedValue, transferables] = handler.serialize(value);
      
    
       
    const proxyTransferHandler: TransferHandler<object, MessagePort> = 
      canHandle: (val): val is ProxyMarked =>
        isObject(val) && (val as ProxyMarked)[proxyMarker],
      // ④       
      serialize(obj) 
        const  port1, port2  = new MessageChannel();
        expose(obj, port1);
        return [port2, [port2]];
      ,
      deserialize(port) 
        port.start();
        return wrap(port);
      ,
        
    
    // ⑤
    function requestResponseMessage(): Promise<WireValue> 
      return new Promise((resolve) => 
        ep.addEventListener("message", function l(ev: MessageEvent) 
          resolve(ev.data);
         as any);
        ep.postMessage( id, ...msg , transfers);
      );
    
    

    关于 get 中 then 的特别说明:

    根据 ECMAScript® 2022 Language Specification 中 await 的描述:

    • await value 在内部实现中会变成 await Promise.resolve(value)
    • Promise.resolve 的处理中 则会获取 value.then 的值,如果它是一个函数则会通过它创建一个 Promise Job。

    await value => await Promise.resolve(value) => await then

    下述例子中 value 等于 success

    const value = await 
      then: (resolve, reject) => 
        resolve('success')
      
    
    
    expose

    ① on message:监听 message 事件
    ② fromWireValue:对接受参数进行统一格式处理
    ③ deserialize => wrap:发送消息对列、同时代理相关内容(通过 wrap,上面介绍)
    ④ GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE:针对不同MessageType,执行不同逻辑
    ⑤ returnValue :依据分支 ④ 产生返回结果
    ⑥ toWireValue
    ⑦ serialize(MessageChannel)
    ⑧ postMessage(transferables):发送

    export function expose(obj: any, ep: Endpoint = self as any) 
      // ①
      ep.addEventListener("message", function callback(ev: MessageEvent) 
        // ② ③
        const argumentList = (ev.data.argumentList || []).map(fromWireValue);
        switch (type) 
          // ④  
        	case GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE
            // ⑤
          	returnValue = ...  
        
        Promise.resolve(returnValue)
          .catch((value) => 
          return  value, [throwMarker]: 0 ;
        ).then((returnValue) => 
          // ⑥ ⑦ 
          const [wireValue, transferables] = toWireValue(returnValue);
          // ⑧
          ep.postMessage( ...wireValue, id , transferables);
        );
      
    
    
    // ②
    function fromWireValue(value: WireValue): any 
      switch (value.type) 
        case WireValueType.HANDLER:
          // ③
          return transferHandlers.get(value.name)!.deserialize(value.value);
        case WireValueType.RAW:
          return value.value;
      
       
      
    const proxyTransferHandler: TransferHandler<object, MessagePort> = 
      deserialize(port) 
        port.start();
        // 执行 wrap 流程
        return wrap(port);
      ,
       
    

    使用

    如果大家项目中需要使用 webWorker,强烈推荐大家尝试 Comlink,Comlink 同项目结合,可以使用 comlink-loader

    以上是关于Web 多线程开发利器 Comlink 的剖析与思考的主要内容,如果未能解决你的问题,请参考以下文章

    Web Workers RPC:Comlink 源码解析

    Web Workers RPC:Comlink 源码解析

    52java多线程剖析

    前端开发技术-剖析JavaScript单线程

    剖析管理所有大数据组件的可视化利器:Hue

    性能优化利器:剖析MySQL 5.7新特征 sys schema