Web 多线程开发利器 Comlink 的剖析与思考
Posted 奇舞精选
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Web 多线程开发利器 Comlink 的剖析与思考相关的知识,希望对你有一定的参考价值。
Node、Deno 环境同样使用事件循环机制进行处理,不过在模型上存在差异。关于事件循环的具体细节本文不会细说,但核心思想在于:任务队列 + 异步回调。
事实上,即使存在事件循环机制,某些任务依然会极大地占用主线程,例如近无限循环,会直接导致 CPU 占用 100%,此时后续的所有任务被阻塞,页面卡住,甚至失去响应,这在用户体验上是非常不友好的。但往往这样的任务不可避免,通常我们将其分为两类:
此时,多线程往往能起到关键性的作用,目前绝大多数现代计算机都拥有多核心,多线程处理能力,如果能物尽其用,必然是极好的。
查看逻辑处理器内核数量
函数,根据变量提升规则,只会有最后一个生效。那么想要执行不同的操作,除了新开一个工作线程外(失去意义),就只能在这个监听函数中通过 switch
或 if
进行返回,这样违反了单一职责原则。呢?对于刚才提到的问题一,我们可以通过异步接口的形式返回想要的结果,得益于 ES6 中的 Promise 对象,通常我们对于异步的写法如下:
和 onmessage
封装成一个函数,该函数返回一个 Promise,通过调用,进行“异步”操作?这当然是可以的。那么,这个函数必然在工作线程中,我们怎么去调用工作线程中的函数进行操作呢?
RPC:Remote Procedure Call (https://en.wikipedia.org/wiki/Remote_procedure_call),远程过程调用,指调用不同于当前上下文环境的方法,通常可以是不同的线程、域、网络主机,通过提供的接口进行调用。
通过 RPC 方式,我们可以达到想要的目的。这里就会介绍本文的主角 Comlink (https://github.com/GoogleChromeLabs/comlink)!
没有条件,就要创造条件
和 Comlink.expose(obj)
,它通过这种方式,将工作线程脚本中的上下文暴露给主线程环境中,下面通过查看部分核心代码来了解其具体的实现方式。函数的具体实现:返回了一个 Proxy 对象,并且代理了 get
,set
,apply
,construct
四种不同的操作。如 obj.counter
操作,又会返回一个新的 Proxy 对象。此处需要注意的是,await obj.counter
,会访问 Proxy 对象上的 then
属性,因此会进入 if (prop === "then")
判断,执行 requestResponseMessage
函数:和 postMessage
呈现在眼前,所以当访问代理对象上的属性时,其实是发送了 GET 消息到工作线程,把真实值通过消息返回,形成看上去是本地调用的假象。
再来看 expose
函数的具体实现:指向工作线程上下文环境,addEventListener
和 start
开始发送和监听消息队列,本质和方法 onmessage (https://developer.mozilla.org/zh-CN/docs/Web/API/MessagePort#方法) 一致,这也印证了 wrap
的设想 —— 取工作线程上下文中对象的值,并通过消息返回。此处仅例举了 GET 操作,从 switch-case 结构和 Proxy 对象拦截的操作可以看出,不同的操作,会进行相应的处理,本文不一一详述。
由此可见,Comlink 采用的 RPC 代理方式,并不是传递上下文环境,因为这是非常危险的,而且函数传递时会导致 Uncaught (in promise) DOMException: Failed to execute \'postMessage\' on \'Worker\': xxx could not be cloned.
报错。它本质上依然是 MessagePort 消息通讯,不过封装了我们所头疼的“操作判断”,并以一种更优雅的方式(Proxy + Promise)来处理。另外除了简单的调用和取值外,Comlink 还支持回调 (https://github.com/GoogleChromeLabs/comlink#callbacks) 和共享线程 (https://github.com/GoogleChromeLabs/comlink#sharedworker),感兴趣的可以自行了解。
与 window.opener
,理论上说,Comlink 的实现方式都可以适用于这些场景。回到最初,通过 switch-case 或条件判断来扩展函数逻辑,往往是我们能想到的第一种解决方案,因为违反了单一职责原则,被无情抛弃。但是如果这种方式能够进行一定程度内聚,往往会有出其不意的效果,这样的设计思维方式一样可以适用于其他领域。
等多种发展方向供员工选择,并辅以提供相应的技术力、专业力、通用力、领导力等培训课程。奇舞团以开放和求贤的心态欢迎各种优秀人才关注和加入奇舞团。
Web Workers RPC:Comlink 源码解析
上篇文章,有提及 Web Workers RPC 以解决浏览器不阻塞UI的问题,其中 comlink 是一把利器,本文就 comlink 的关键源码进行解析。
Comlink 通过提供 RPC 实现将基于 Worker.postMessage(someObject)
的 API 变成了对开发人员更友好的“类似本地调用”方式。
拆解源码之前,先介绍几个重要的概念:Proxy、Channel Messaging API、Transferable objects
注意:worker 创建完成后,每次通信都是新建 MessageChannel,目的是避免消息冲突。
重要概念
proxy
new Proxy(target, handler)
target
被代理的对象handler
被代理对象上的自定义行为
handler 处理函数 | 说明 |
---|---|
get | 劫持获取属性值 |
set | 劫持设置属性值 |
apply | 劫持函数调用 |
construct | 劫持 new 操作符 |
apply
function sum(a, b)
return a + b
const handler =
apply: function(target, thisArg, args)
return target(...args) * 10
;
const proxy1 = new Proxy(sum, handler)
console.log(sum(1, 2)) // 3
console.log(proxy1(1, 2)) // 30
construct
class P
constructor (name)
this.name = name
sayName ()
console.log(this.name)
return this.name
const ProxyP = new Proxy(P,
construct (target, args)
return new target(...args)
)
new ProxyP('LiGang').sayName() // LiGang
Channel Messaging API
Channel Messaging API 允许两个不同的脚本运行在同一个文档的不同浏览器上下文(比如两个 iframe,或者文档主体和一个 iframe,使用 SharedWorker 的两个文档,或者两个 worker)来直接通讯,在每端使用一个端口(port)通过双向频道(channel)向彼此传递消息。
使用 MessageChannel()
构造函数来创建通讯信道。一旦创建,信道的两个端口即可通过 MessageChannel.port1
和 MessageChannel.port2
属性进行访问(都会返回 MessagePort
对象)。
MessageChannel 以 DOM Event
的形式发送消息,所以它属于异步的宏任务。
示例:作为 EventEmitter 事件订阅发布使用,实现脚本间通信
/* one.js */
export default function (port) port.onmessage/port.postMessage
/* two.js */
export default function (port) port.onmessage/port.postMessage
/* index.js */
import one from 'one.js'
import two from 'two.js'
const port1, port2 = new MessageChannel()
one(port1)
two(port2)
通过 Channel Messaging 进行通信,也可以完成 worker 和 worker 直接通信,无需主进程。
Transferable objects
可转移对象是拥有可以从一个上下文转移到另一个上下文的资源的对象,确保资源一次只能在一个上下文中可用。转移后,原始对象不再可用;它不再指向传输的资源,任何读取或写入对象的尝试都将引发异常。
可转移对象通常用于共享一次只能安全地暴露给单个 JavaScript 线程的资源。
支持的对象:ArrayBuffer、MessagePort、ReadableStream、TransformStream、AudioData、ImageBitmap 等
Channel Messaging API 的 MessageChannel
接口允许我们创建一个新的消息通道,并通过它的两个 MessagePort 属性(port1/port2)发送数据。
myWorker.postMessage(aMessage, transferList)
transferList(可选):一个可选的Transferable
对象的数组,用于传递所有权。如果一个对象的所有权被转移,在发送它的上下文中将变为不可用(中止),并且只有在它被发送到的 worker 中可用。
源码解析
- 通过 Proxy 对
wrap(worker)
劫持相关操作; - 通过 ep(worker/MessageChannel)进行
on message
以及postMessage
操作;- 基本类型:直接通过 worker 传递;
- 非基本类型:需要通过 MessageChannel 传递port,进行 expose、wrap 处理
- 通过
toWireValue/fromWireValue
对通信原始数据处理
wrap
① createProxy(Proxy:get/set/apply/construct):创建后,会生成相应 Proxy
② processArguments : Proxy(get/set/apply/construct) 劫持处理
③ toWireValue:对传入参数进行统一格式处理 [type,name,value, [transferables]]
④ serialize(MessageChannel)=> expose:创建 MessageChannel 通信管道、同时监听 worker 的返回(通过 expose,下面介绍)
⑤ requestResponseMessage(on message、postMessage):监听消息,同时发送当前信息
// ①
function createProxy<T>(
ep: Endpoint,
path: (string | number | symbol)[] = [],
target: object = function ()
): Remote<T>
return new Proxy(target,
get() ,
set() ,
apply() ,
construct()
const [argumentList, transferables] = processArguments(rawArgumentList);
// ⑤
return requestResponseMessage(...).then(fromWireValue)
// ②
function processArguments(argumentList: any[]): [WireValue[], Transferable[]]
const processed = argumentList.map(toWireValue);
return [processed.map((v) => v[0]), myFlat(processed.map((v) => v[1]))];
// ③
function toWireValue(value: any): [WireValue, Transferable[]]
for (const [name, handler] of transferHandlers)
const [serializedValue, transferables] = handler.serialize(value);
const proxyTransferHandler: TransferHandler<object, MessagePort> =
canHandle: (val): val is ProxyMarked =>
isObject(val) && (val as ProxyMarked)[proxyMarker],
// ④
serialize(obj)
const port1, port2 = new MessageChannel();
expose(obj, port1);
return [port2, [port2]];
,
deserialize(port)
port.start();
return wrap(port);
,
// ⑤
function requestResponseMessage(): Promise<WireValue>
return new Promise((resolve) =>
ep.addEventListener("message", function l(ev: MessageEvent)
resolve(ev.data);
as any);
ep.postMessage( id, ...msg , transfers);
);
关于 get
中 then 的特别说明:
根据 ECMAScript® 2022 Language Specification 中 await 的描述:
await value
在内部实现中会变成await Promise.resolve(value)
- 而 Promise.resolve 的处理中 则会获取
value.then
的值,如果它是一个函数则会通过它创建一个 Promise Job。
await value => await Promise.resolve(value) => await then
下述例子中 value
等于 success
const value = await
then: (resolve, reject) =>
resolve('success')
expose
① on message:监听 message 事件
② fromWireValue:对接受参数进行统一格式处理
③ deserialize => wrap:发送消息对列、同时代理相关内容(通过 wrap,上面介绍)
④ GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE:针对不同MessageType,执行不同逻辑
⑤ returnValue :依据分支 ④ 产生返回结果
⑥ toWireValue
⑦ serialize(MessageChannel)
⑧ postMessage(transferables):发送
export function expose(obj: any, ep: Endpoint = self as any)
// ①
ep.addEventListener("message", function callback(ev: MessageEvent)
// ② ③
const argumentList = (ev.data.argumentList || []).map(fromWireValue);
switch (type)
// ④
case GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE
// ⑤
returnValue = ...
Promise.resolve(returnValue)
.catch((value) =>
return value, [throwMarker]: 0 ;
).then((returnValue) =>
// ⑥ ⑦
const [wireValue, transferables] = toWireValue(returnValue);
// ⑧
ep.postMessage( ...wireValue, id , transferables);
);
// ②
function fromWireValue(value: WireValue): any
switch (value.type)
case WireValueType.HANDLER:
// ③
return transferHandlers.get(value.name)!.deserialize(value.value);
case WireValueType.RAW:
return value.value;
const proxyTransferHandler: TransferHandler<object, MessagePort> =
deserialize(port)
port.start();
// 执行 wrap 流程
return wrap(port);
,
使用
如果大家项目中需要使用 webWorker,强烈推荐大家尝试 Comlink,Comlink 同项目结合,可以使用 comlink-loader
以上是关于Web 多线程开发利器 Comlink 的剖析与思考的主要内容,如果未能解决你的问题,请参考以下文章