如何确保清除 ktor websocket 客户端创建的所有 Kotlin 协程?

Posted

技术标签:

【中文标题】如何确保清除 ktor websocket 客户端创建的所有 Kotlin 协程?【英文标题】:How can I make sure all Kotlin coroutines created by a ktor websocket client are cleared up? 【发布时间】:2019-12-26 01:25:40 【问题描述】:

我正试图围绕 Kotlin 协程和 Ktors websocket 支持。我的理解是runBlocking 将创建一个作用域,并且只要该作用域(或子作用域)内有协程,它就会阻塞,但是当下面测试中对runBlocking 的调用返回时,仍然有两个协程活着..

为什么我在这里泄漏协程?

package dummy

import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.client.features.websocket.wss
import io.ktor.http.HttpMethod
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readBytes
import io.ktor.http.cio.websocket.readText
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.*
import kotlinx.coroutines.debug.DebugProbes
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@ExperimentalCoroutinesApi
@KtorExperimentalAPI
class WebsocketTest 

    @Test
    fun tidy() 
        DebugProbes.install()

        runBlocking 
            val socketJob = Job()

            launch(CoroutineName("Websocket") + socketJob) 
                println("Connecting to websocket")
                connectWebsocket(socketJob)
                println("Websocket dead?")
            

            launch(CoroutineName("Ninja socket killer")) 
                delay(3500)
                println("Killing websocket client")
                socketJob.cancel(message = "Time to die..")
            
        

        println("\n\n-------")
        DebugProbes.dumpCoroutines(System.err)
        Assertions.assertEquals(0, DebugProbes.dumpCoroutinesInfo().size, "It would be nice if all coroutines had been cleared up by now..")
    




@KtorExperimentalAPI
private suspend fun connectWebsocket(socketJob: CompletableJob) 

    val client = HttpClient 
        install(WebSockets)
    

    socketJob.invokeOnCompletion 
        println("Shutting down ktor http client")
        client.close()
    

    client.wss(
            method = HttpMethod.Get,
            host = "echo.websocket.org",
            port = 443,
            path = "/"
    ) 

        send(Frame.Text("Hello World"))

        for (frame in incoming) 
            when (frame) 
                is Frame.Text -> println(frame.readText())
                is Frame.Binary -> println(frame.readBytes())
            

            delay(1000)
            send(Frame.Text("Hello World"))
        
    


build.gradle.kts

import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestLogEvent

plugins 
    kotlin("jvm") version "1.3.41" apply true


repositories 
    mavenCentral()


val ktorVersion = "1.2.3"
val junitVersion = "5.5.1"

dependencies 
    implementation(kotlin("stdlib-jdk8"))

    implementation("io.ktor:ktor-client-websockets:$ktorVersion")
    implementation("io.ktor:ktor-client-okhttp:$ktorVersion")

    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0-RC2")


    testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")


tasks.withType<Test> 
    useJUnitPlatform()
    testLogging 
        showExceptions = true
        showStackTraces = true
        exceptionFormat = TestExceptionFormat.FULL
        events = setOf(TestLogEvent.PASSED, TestLogEvent.SKIPPED, TestLogEvent.FAILED)
    

【问题讨论】:

刚找到这个 gem:github.com/ktorio/ktor/blob/master/ktor-utils/jvm/src/io/ktor/… 似乎其中一个“泄露”的协程已明确启动到全局范围内。 【参考方案1】:

似乎我已经想通了(显然是在我把头发扯得足够长以首先发表这篇文章之后)。当我写这篇文章时,我泄露了两个协程,其中一个“自行解决”(我对此不太满意,但无论我做什么我都无法重现它)。

第二个协程泄露是因为来自 Ktor 的 Nonce.kt 明确地在 GlobalScope 中启动了一个协程。

https://github.com/ktorio/ktor/blob/master/ktor-utils/jvm/src/io/ktor/util/Nonce.kt#L30

private val nonceGeneratorJob =
GlobalScope.launch(
    context = Dispatchers.IO + NonCancellable + NonceGeneratorCoroutineName,
    start = CoroutineStart.LAZY
)  ....

【讨论】:

以上是关于如何确保清除 ktor websocket 客户端创建的所有 Kotlin 协程?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Ktor-client for Android 中清除不记名令牌

尝试在客户端接收数据时,Ktor-websocket 库不执行任何操作

将自定义标头设置为 websocket 请求 (ktor)

Ktor websockets 身份验证

ktor websocket flow api是如何工作的?

如何在 Ktor websockets 上发送 ping