缓存活动请求的机制(协程)
Posted
技术标签:
【中文标题】缓存活动请求的机制(协程)【英文标题】:Mechanism for caching active requests (coroutines) 【发布时间】:2021-11-29 08:06:30 【问题描述】:我需要实现活动请求的缓存,我做了如下:
private val requestJobCache: MutableMap<RequestBody, Deferred<Response>> = mutableMapOf()
suspend fun fetch(body: RequestBody): Response
// ... some code
suspend fun get(body: RequestBody): Response
if (requestJobCache.containsKey(body))
return requestJobCache[body]!!.await()
return coroutineScope
try
val request = async fetch(body)
requestJobCache[body] = request
return@coroutineScope request.await()
finally
val finishedRequestJob = requestJobCache.remove(body)
// ...
// what does finishedRequestJob?.isCompleted equal?
这段代码有效,但有一个奇怪的地方:理论上finishedRequestJob?.IsCompleted
应该总是返回true
,因为这段代码是在异步函数收到结果后在finally
块中执行的。然而,在实践中,此方法有时会返回false
。
为什么会发生这种情况,我的推理在哪里犯了错误,以及如何正确实施?
【问题讨论】:
你真的应该同步访问requestJobCache
。我不确定这是否是这里发生的情况,但请注意,例如在requestJobCache.containsKey(body)
和requestJobCache[body]!!.await()
之间,另一个线程/协程可能remove()
项目,导致NPE。同样,多个协程可能会检查containsKey()
是否有相同的项目,然后它们都会启动请求,然后finishedRequestJob?.isCompleted
其中一个会返回false
。
@broot,好点子!最初的实现使用ConcurrentHashMap
,但我不知道这个方法对于kotlin有多惯用,我要找/问一个单独的问题。
使用ConcurrentHashMap
并不能解决这种情况,因为问题出现在containsKey()
和requestJobCache[body]
之间(获取和设置)。您需要使用互斥锁或类似技术,但coroutineScope()
让它变得更难了。
coroutineScope
有哪些陷阱?我不能在我使用集合的任何地方都使用withLock
吗?
看,互斥锁并不是一个神奇的东西,它会自动使代码线程安全。如果您将containsKey()
放入withLock()
和requestJobCache[body]
放入另一个withLock()
,那么您仍然会遇到完全相同的问题。关键是您必须确保在检查containsKey()
和从中检索/添加到它之间没有人会触摸requestJobCache
。您需要将containsKey()
和requestJobCache[body]
放在一个withLock()
块中。问题是:我们不能在withLock()
中挂起,因为我们会阻止所有试图使用get()
函数的人。
【参考方案1】:
造成这种情况的一个可能原因是对requestJobCache
的访问未同步。例如,在requestJobCache.containsKey(body)
和requestJobCache[body]!!.await()
之间,另一个线程/协程可能remove()
项目,导致NullPointerException
。类似地,多个协程可能会检查containsKey()
是否有相同的项目,然后它们都会发起请求,然后finishedRequestJob?.isCompleted
会返回false
。
要解决这个问题,我们需要同步对requestJobCache
的访问。在这种情况下,由于挂起和coroutineScope()
,这有点复杂,但我们应该能够使用如下代码解决这些问题:
private val requestJobCache: MutableMap<RequestBody, CompletableDeferred<Response>> = mutableMapOf()
private val requestJobCacheLock = Mutex()
@Suppress("DeferredResultUnused")
suspend fun get(body: RequestBody): Response
val (existed, deferred) = requestJobCacheLock.withLock
requestJobCache[body]
?.let true to it
?: (false to CompletableDeferred<Response>().also
requestJobCache[body] = it
)
return if (existed)
deferred.await()
else
val result = runCatching fetch(body)
deferred.completeWith(result)
requestJobCacheLock.withLock
requestJobCache.remove(body)
result.getOrThrow()
请注意,我没有机会测试此代码,因此它可能包含错误。我希望你能明白。
【讨论】:
以上是关于缓存活动请求的机制(协程)的主要内容,如果未能解决你的问题,请参考以下文章