来自阻塞代码的异步/等待 kotlin 协程

Posted

技术标签:

【中文标题】来自阻塞代码的异步/等待 kotlin 协程【英文标题】:Async/await kotlin coroutines from blocking code 【发布时间】:2020-12-13 03:16:44 【问题描述】:

我正在使用没有响应式 Web 的 Spring Boot。

我尝试使用 Kotlin 协程运行一些异步请求

    @GetMapping
    fun test(): Message 
        val restTemplate = RestTemplate()
        return runBlocking 
            val hello = async  hello(restTemplate) 
            val world = async  world(restTemplate) 
            Message("$hello.await() $world.await()!")
        
    

    private suspend fun world(restTemplate: RestTemplate): String 
        logger.info("Getting WORLD")
        return restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
    

    private suspend fun hello(restTemplate: RestTemplate): String 
        logger.info("Getting HELLO")
        return restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
    

但是这段代码是按顺序运行的。

我该如何解决?

【问题讨论】:

restTemplate.getForEntity 是挂起函数吗? 没有。这不是暂停乐趣 仅仅将一个函数标记为挂起并不能使它成为可挂起或异步的,另一个副作用是runBlocking是单线程的,所以单线程将首先启动然后被阻塞,然后只有第二个请求将发生。您必须使用 withContext(Dispatchers.IO) /* Blocking Call */ 包装阻塞调用。 【参考方案1】:

TL;DRasync 与用于卸载阻塞IO 的自定义调度程序(例如Dispatchers.IO)一起使用。

val hello = async(Dispatchers.IO)  hello(restTemplate) 
val world = async(Dispatchers.IO)  world(restTemplate) 

更新: 我在Kotlin coroutines slack channel 中被告知,我可以使用async(Dispatchers.IO) 而不是使用async + withContext(Dispatchers.IO)

我使用了@Sergey Nikulitsa 代码并创建了一个扩展函数,它接受一个带有接收器的 lambda(类似于 async)来组合 asyncwithContext(Dispatches.IO)

import kotlinx.coroutines.*

fun <T> CoroutineScope.myRestTemplateAsync(
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> 

    return async(Dispatchers.IO, start) 
        block() 
    

然后它可以像这样在你的代码中使用:


@GetMapping
fun test(): Message 
    val restTemplate = RestTemplate()
    return runBlocking 
        val hello = myRestTemplateAsync  hello(restTemplate) 
        val world = myRestTemplateAsync  world(restTemplate) 
        Message("$hello.await() $world.await()!")
    


private suspend fun world(restTemplate: RestTemplate): String 
    logger.info("Getting WORLD")
    return restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload


private suspend fun hello(restTemplate: RestTemplate): String 
    logger.info("Getting HELLO")
    return restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
 

初步结果

此时,我只是在试验这种方法,我只使用 Spring WebMVC 和 RestTemplate 进行 5 次以上的调用。

myRestTemplateAsync 扩展函数与同步对应函数相比,始终将执行时间缩短了 30% 到 50%

为什么只使用 async 就可以解决问题?

特别是对于 RestTemplate,在 coroutineScope 中使用 async ... 似乎没有什么不同,并且执行时间与同步代码相当。

此外,查看分析器中的线程,在单独使用 async 时没有创建“Dispatcher Workers”。这让我相信 RestTemplate 的每请求线程模型阻塞了整个线程。

async 中指定了新的调度程序时,它会将协程(和函数block)的执行转移到Dispatchers.IO 线程池中的新线程。

在这种情况下,代码块应包含 RestTemplate 调用(单个调用)。据我所知,这可以防止 RestTemplate 阻塞原始上下文。

您为什么要使用这种方法?

如果您一直在大型项目中使用 RestTemplate(每个请求线程模型),那么仅将其替换为 WebClient 等非阻塞客户端可能是一项艰巨的任务。有了这个,您可以继续使用您的大部分代码,只需在您可以异步进行多次调用的代码区域添加myRestTemplateAsync

如果您要开始一个新项目,请不要使用RestTemplate。相反最好使用WebFlux with coroutines in Kotlin as explained in this article。

这是个好主意吗?

目前,我没有足够的信息来说明一种或另一种方式。我希望进行更广泛的测试和评估:

负载下的内存消耗 负载下线程池可能耗尽 异常是如何传播和处理的

如果您对为什么这可能是一个好主意或可能不是一个好主意有任何意见,请在下面发布。

【讨论】:

【参考方案2】:

该代码是并行工作的:

    @GetMapping
    fun test(): Message 
        val restTemplate = RestTemplate()
        return runBlocking 
            val hello = async  hello(restTemplate) 
            val world = async  world(restTemplate) 
            Message("$hello.await() $world.await()!")
        
    

    private suspend fun world(restTemplate: RestTemplate): String 
        logger.info("Getting WORLD")
        return withContext(Dispatchers.IO) 
            restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
        
    

    private suspend fun hello(restTemplate: RestTemplate): String 
        logger.info("Getting HELLO")
        return withContext(Dispatchers.IO) 
            restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
        
    

【讨论】:

该代码并行运行。但我发现我无法让 restTemplate 同时获得两个响应。【参考方案3】:

也许根本原因是:

restTemplate 使用 java.io(不是 java.nio) restTemplate 阻止当前线程,直到它得到 HTTP 响应 协程魔法在这种情况下不起作用

解决方案:

使用使用 java.nio 的 http-client

【讨论】:

【参考方案4】:

runBlocking:它旨在将常规阻塞代码连接到以挂起样式编写的库,用于主要功能和测试。

这里我们使用coroutineScope 方法创建一个CoroutineScope。这个函数是为并行分解工作而设计的。当此范围内的任何子协程失败时,此范围将失败,并且所有其余子协程都将被取消。

因为coroutineScope是挂起函数,所以我们将fun test()标记为suspend fun(挂起函数只允许从协程或其他挂起函数中调用)。通过使用CoroutineScope对象,我们可以调用asynclaunch来启动协程

  @GetMapping
  suspend fun test(): Message 
        val restTemplate = RestTemplate()
        return coroutineScope 
            val hello = async  hello(restTemplate) 
            val world = async  world(restTemplate) 
            Message("$hello.await() $world.await()!")
        
    

【讨论】:

也许你可以添加一些文字来解释为什么这样有效?有代码固然很好,但对于可能还不知道的人来说,这并不是不言而喻的。 Spring 框架不允许对休息控制器使用挂起功能(开箱即用) 实现(“org.springframework.boot:spring-boot-starter-webflux”) 基本上 spring-boot-starter-webflux 带来了对挂起函数的支持。 baeldung.com/spring-boot-kotlin-coroutines

以上是关于来自阻塞代码的异步/等待 kotlin 协程的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程 异步 异步流

Kotlin协程基础

Kotlin 协程协程简介 ( 协程概念 | 协程作用 | 创建 Android 工程并进行协程相关配置开发 | 异步任务与协程对比 )

Kotlin 协程协程简介 ( 协程概念 | 协程作用 | 创建 Android 工程并进行协程相关配置开发 | 异步任务与协程对比 )

kotlin协程硬核解读(1. 协程初体验)

kotlin协程硬核解读(1. 协程初体验)