这个用于减少异步迭代的 Promise 取消的实现是否正确?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了这个用于减少异步迭代的 Promise 取消的实现是否正确?相关的知识,希望对你有一定的参考价值。

我想为我的库中的一个方法启用 Promise 取消。reduce. 我只对取消异步迭代的Promise感兴趣,因为这些很有可能无限期地挂起。

const reduceAsyncIterable = async (fn, possiblyX0, state, x) => {
  const iter = x[Symbol.asyncIterator]()
  const y0 = isUndefined(possiblyX0) ? (await iter.next()).value : possiblyX0
  if (isUndefined(y0)) {
    throw new TypeError('reduce(...)(x); x cannot be empty')
  }
  let y = await fn(y0, (await iter.next()).value)
  for await (const xi of iter) {
    if (state.cancelled) return // stops async iterating if `cancel` called
    y = await fn(y, xi)
  }
  return y
}

const reduce = (fn, x0) => {
  if (!isFunction(fn)) {
    throw new TypeError('reduce(x, y); x is not a function')
  }
  return x => {
    if (isIterable(x)) return reduceIterable(fn, x0, x)
    if (isAsyncIterable(x)) {
      const state = { cancelled: false, resolve: () => {} }
      const p = new Promise((resolve, reject) => {
        state.resolve = resolve
        reduceAsyncIterable(fn, x0, state, x).then(
          y => state.cancelled || resolve(y)
        ).catch(reject)
      })
      p.cancel = () => { state.cancelled = true; state.resolve() } // shortcircuit the Promise `p` on `cancel` call
      return p
    }
    if (is(Object)(x)) return reduceObject(fn, x0, x)
    throw new TypeError('reduce(...)(x); x invalid')
  }
}

上面的代码似乎可以工作,但我不禁觉得这里有内存泄漏。尤其是在 await iter.next()for await (const xi of iter). 如果这些 await 语句需要花费很长时间(对于一个异步迭代器来说很可能是这样)。reduceAsyncIterable 可能永远不会返回。从用户的角度来看,这是很好的,因为短路发生在 reduce,因为用户看到的Promise已经解决了。但是从计算机的角度来看,取消这个操作的Promise会不会造成内存泄漏?

我希望能够使用 cancel 函数的返回承诺,就像这样。

const myOngoingTaskPromise = reduce(someReducer, null)(myInfiniteAsyncIterable)

myOngoingTaskPromise.cancel() // resolves myOngoingTaskPromise with undefined

myOngoingTaskPromise // Promise { undefined }
答案

我找到了方法。Promise.race 就像秘密武器什么的

    if (isAsyncIterable(x)) {
      const state = { cancel: () => {} }
      const cancelToken = new Promise((_, reject) => { state.cancel = reject })
      const p = Promise.race([
        reduceAsyncIterable(fn, x0, x),
        cancelToken,
      ])
      p.cancel = () => { state.cancel(new Error('cancelled')) }
      return p
    }

无记忆泄漏

reduce-async-iterable-promise-cancellation

以上是关于这个用于减少异步迭代的 Promise 取消的实现是否正确?的主要内容,如果未能解决你的问题,请参考以下文章

使用Promise.race实现超时机制取消XHR请求

对Promise中的resolve,reject,catch的理解

promise怎么实现异步

怎么用promise实现异步控制

迭代器,生成器(generator)和Promise的“微妙”关系

Promise对象