缓存活动请求的机制(协程)

Posted

技术标签:

【中文标题】缓存活动请求的机制(协程)【英文标题】:Mechanism for caching active requests (coroutines) 【发布时间】:2021-11-29 08:06:30 【问题描述】:

我需要实现活动请求的缓存,我做了如下:

private val requestJobCache: MutableMap<RequestBody, Deferred<Response>> = mutableMapOf()

suspend fun fetch(body: RequestBody): Response 
    // ... some code


suspend fun get(body: RequestBody): Response 
    if (requestJobCache.containsKey(body)) 
        return requestJobCache[body]!!.await()
    

    return coroutineScope 
        try 
            val request = async  fetch(body) 

            requestJobCache[body] = request
            return@coroutineScope request.await()
        
        finally 
            val finishedRequestJob = requestJobCache.remove(body)

            // ...
            // what does finishedRequestJob?.isCompleted equal?
        
    

这段代码有效,但有一个奇怪的地方:理论上finishedRequestJob?.IsCompleted 应该总是返回true,因为这段代码是在异步函数收到结果后在finally 块中执行的。然而,在实践中,此方法有时会返回false

为什么会发生这种情况,我的推理在哪里犯了错误,以及如何正确实施?

【问题讨论】:

你真的应该同步访问requestJobCache。我不确定这是否是这里发生的情况,但请注意,例如在requestJobCache.containsKey(body)requestJobCache[body]!!.await() 之间,另一个线程/协程可能remove() 项目,导致NPE。同样,多个协程可能会检查containsKey() 是否有相同的项目,然后它们都会启动请求,然后finishedRequestJob?.isCompleted 其中一个会返回false @broot,好点子!最初的实现使用ConcurrentHashMap,但我不知道这个方法对于kotlin有多惯用,我要找/问一个单独的问题。 使用ConcurrentHashMap并不能解决这种情况,因为问题出现在containsKey()requestJobCache[body]之间(获取和设置)。您需要使用互斥锁或类似技术,但coroutineScope() 让它变得更难了。 coroutineScope 有哪些陷阱?我不能在我使用集合的任何地方都使用withLock 吗? 看,互斥锁并不是一个神奇的东西,它会自动使代码线程安全。如果您将containsKey() 放入withLock()requestJobCache[body] 放入另一个withLock(),那么您仍然会遇到完全相同的问题。关键是您必须确保在检查containsKey() 和从中检索/添加到它之间没有人会触摸requestJobCache。您需要将containsKey()requestJobCache[body] 放在一个withLock() 块中。问题是:我们不能在withLock() 中挂起,因为我们会阻止所有试图使用get() 函数的人。 【参考方案1】:

造成这种情况的一个可能原因是对requestJobCache 的访问未同步。例如,在requestJobCache.containsKey(body)requestJobCache[body]!!.await() 之间,另一个线程/协程可能remove() 项目,导致NullPointerException。类似地,多个协程可能会检查containsKey() 是否有相同的项目,然后它们都会发起请求,然后finishedRequestJob?.isCompleted 会返回false

要解决这个问题,我们需要同步对requestJobCache 的访问。在这种情况下,由于挂起和coroutineScope(),这有点复杂,但我们应该能够使用如下代码解决这些问题:

private val requestJobCache: MutableMap<RequestBody, CompletableDeferred<Response>> = mutableMapOf()
private val requestJobCacheLock = Mutex()

@Suppress("DeferredResultUnused")
suspend fun get(body: RequestBody): Response 
    val (existed, deferred) = requestJobCacheLock.withLock 
        requestJobCache[body]
            ?.let  true to it 
            ?: (false to CompletableDeferred<Response>().also 
                requestJobCache[body] = it
            )
    

    return if (existed) 
        deferred.await()
     else 
        val result = runCatching  fetch(body) 
        deferred.completeWith(result)

        requestJobCacheLock.withLock 
            requestJobCache.remove(body)
        

        result.getOrThrow()
    

请注意,我没有机会测试此代码,因此它可能包含错误。我希望你能明白。

【讨论】:

以上是关于缓存活动请求的机制(协程)的主要内容,如果未能解决你的问题,请参考以下文章

beego20性能

给协程加上同步互斥机制

http缓存机制

浏览器缓存机制(HTTP缓存机制)

前端http请求细节——Cache-Control(缓存机制)

Glide 缓存机制及源码