Kotlin协程-select基础

Posted 且听真言

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程-select基础相关的知识,希望对你有一定的参考价值。

一、select是什么?

select——>用于选择更快的结果。

基于场景理解

比如客户端要查询一个商品的详情。两个服务:缓存服务,速度快但信息可能是旧的;网络服务,速度慢但信息一定是最新的。

如何实现上述逻辑:

   runBlocking 
        suspend fun getCacheInfo(productId: String): Product 
            delay(100L)
            return Product(productId, 8.9)
        

        suspend fun getNetworkInfo(productId: String): Product? 
            delay(200L)
            return Product(productId, 8.8)
        

        fun updateUI(product: Product) 
            println("$product.productId : $product.price")
        

        val startTime = System.currentTimeMillis()
        val productId = "001"
        val cacheInfo = getCacheInfo(productId)
        if (cacheInfo != null) 
            updateUI(cacheInfo)
            println("Time cost: $System.currentTimeMillis() - startTime")
        

        val latestInfo = getNetworkInfo(productId)
        if (latestInfo != null) 
            updateUI(latestInfo)
            println("Time cost: $System.currentTimeMillis() - startTime")
        
    


001 : 8.9
Time cost: 113
001 : 8.8
Time cost: 324

上述程序分为四步:第一步:查询缓存信息;第二步:缓存服务返回信息,更新 UI;第三步:查询网络服务;第四步:网络服务返回信息,更新 UI。

用户可以第一时间看到商品的信息,虽然它暂时会展示旧的信息,但由于我们同时查询了网络服务,旧缓存信息也马上会被替代成新的信息。但是可能存在一些问题:如果程序卡在了缓存服务,获取网络服务就会无法执行。是因为 getCacheInfo() 它是一个挂起函数,只有这个程序执行成功以后,才可以继续执行后面的任务。能否做到:两个挂起函数同时执行,谁返回的速度更快,就选择哪个结果。答案是使用select。

runBlocking 

        suspend fun getCacheInfo(productId: String): Product 
            delay(100L)
            return Product(productId, 8.9)
        

        suspend fun getNetworkInfo(productId: String): Product 
            delay(200L)
            return Product(productId, 8.8)
        

        fun updateUI(product: Product) 
            println("$product.productId : $product.price")
        

        val startTime = System.currentTimeMillis()
        val productId = "001"

        val product = select<Product?> 

            async 
                getCacheInfo(productId)
            .onAwait 
                it
            

            async 
                getNetworkInfo(productId)
            .onAwait 
                it
            
        

        if (product != null) 
            updateUI(product)
            println("Time cost: $System.currentTimeMillis() - startTime")
        
    


001 : 8.9
Time cost: 134

Process finished with exit code 0

由于缓存的服务更快,所以,select 确实帮我们选择了更快的那个结果。我们的 select 可以在缓存服务出现问题的时候,灵活选择网络服务的结果。从而避免用户等待太长的时间,得到糟糕的体验。

在上述代码中,用户大概率是会展示旧的缓存信息。但实际场景下,我们是需要进一步更新最新信息的。

runBlocking 
        suspend fun getCacheInfo(productId: String): Product 
            delay(100L)
            return Product(productId, 8.9)
        

        suspend fun getNetworkInfo(productId: String): Product 
            delay(200L)
            return Product(productId, 8.8)
        

        fun updateUI(product: Product) 
            println("$product.productId : $product.price")
        

        val startTime = System.currentTimeMillis()
        val productId = "001"

        val cacheDeferred = async 
            getCacheInfo(productId)
        
        val latestDeferred = async 
            getNetworkInfo(productId)
        

        val product = select<Product?> 

            cacheDeferred.onAwait 
                it.copy(isCache = true)
            

            latestDeferred.onAwait 
                it.copy(isCache = false)
            
        

        if (product != null) 
            updateUI(product)
            println("Time cost: $System.currentTimeMillis() - startTime")
        

        if (product != null && product.isCache) 
            val latest = latestDeferred.await() ?: return@runBlocking
            updateUI(latest)
            println("Time cost: $System.currentTimeMillis() - startTime")
        

    


001 : 8.9
Time cost: 124
001 : 8.8
Time cost: 228

Process finished with exit code 0

如果是多个服务的缓存场景呢?

 runBlocking 

        val startTime = System.currentTimeMillis()
        val productId = "001"


        suspend fun getCacheInfo(productId: String): Product 
            delay(100L)
            return Product(productId, 8.9)
        

        suspend fun getCacheInfo2(productId: String): Product 
            delay(50L)
            return Product(productId, 8.85)
        

        suspend fun getNetworkInfo(productId: String): Product 
            delay(200L)
            return Product(productId, 8.8)
        

        fun updateUI(product: Product) 
            println("$product.productId : $product.price")
        

        val cacheDeferred = async 
            getCacheInfo(productId)
        

        val cacheDeferred2 = async 
            getCacheInfo2(productId)
        

        val latestDeferred = async 
            getNetworkInfo(productId)
        

        val product = select<Product?> 
            cacheDeferred.onAwait 
                it.copy(isCache = true)
            

            cacheDeferred2.onAwait 
                it.copy(isCache = true)
            

            latestDeferred.onAwait 
                it.copy(isCache = true)
            
        
        if (product != null) 
            updateUI(product)
            println("Time cost: $System.currentTimeMillis() - startTime")
        

        if (product != null && product.isCache) 
            val latest = latestDeferred.await()
            updateUI(latest)
            println("Time cost: $System.currentTimeMillis() - startTime")
        

    


Log

001 : 8.85
Time cost: 79
001 : 8.8
Time cost: 229

Process finished with exit code 0

select 代码模式,可以提升程序的整体响应速度。

二、select 和 Channel

runBlocking 
        val startTime = System.currentTimeMillis()
        val channel1 = produce 
            send(1)
            delay(200L)
            send(2)
            delay(200L)
            send(3)
        

        val channel2 = produce 
            delay(100L)
            send("a")
            delay(200L)
            send("b")
            delay(200L)
            send("c")
        



        channel1.consumeEach 
            println(it)
        
        channel2.consumeEach 
            println(it)
        

        println("Time cost: $System.currentTimeMillis() - startTime")
    


Log

1
2
3
a
b
c
Time cost: 853

Process finished with exit code 0

上述代码串行执行,可以使用select进行优化。

  runBlocking 
        val startTime = System.currentTimeMillis()
        val channel1 = produce 
            send(1)
            delay(200L)
            send(2)
            delay(200L)
            send(3)
        

        val channel2 = produce 
            delay(100L)
            send("a")
            delay(200L)
            send("b")
            delay(200L)
            send("c")
        

        suspend fun selectChannel(
            channel1: ReceiveChannel<Int>,
            channel2: ReceiveChannel<String>
        ): Any 
            return select<Any> 
                if (!channel1.isClosedForReceive) 
                    channel1.onReceive 
                        it.also 
                            println(it)
                        
                    
                

                if (!channel2.isClosedForReceive) 
                    channel2.onReceive 
                        it.also 
                            println(it)
                        
                    
                
            
        

        repeat(6) 
            selectChannel(channel1, channel2)
        
        println("Time cost: $System.currentTimeMillis() - startTime")
    

Log
1
a
2
b
3
c
Time cost: 574

Process finished with exit code 0

从代码执行结果可以发现程序的执行耗时有效减少。onReceive 是 Channel 在 select 当中的语法,当 Channel 当中有数据以后,它就会被回调,通过这个 Lambda,将结果传出去。 执行了 6 次 select,目的是要把两个管道中的所有数据都消耗掉。

如果Channel1不生产数据了,程序会如何执行?

runBlocking 
        val startTime = System.currentTimeMillis()
        val channel1 = produce<String> 
            delay(5000L)
        

        val channel2 = produce<String> 
            delay(100L)
            send("a")
            delay(200L)
            send("b")
            delay(200L)
            send("c")
        

        suspend fun selectChannel(
            channel1: ReceiveChannel<String>,
            channel2: ReceiveChannel<String>
        ): String = select<String> 
            channel1.onReceive 
                it.also 
                    println(it)
                
            
            channel2.onReceive 
                it.also 
                    println(it)
                
            
        
        repeat(3) 
            selectChannel(channel1, channel2)
        
        println("Time cost: $System.currentTimeMillis() - startTime")
    

Log
a
b
c
Time cost: 570

Process finished with exit code 0

 

如果不知道Channel的个数,如何避免ClosedReceiveChannelException?

使用:onReceiveCatching 

runBlocking 
        val startTime = System.currentTimeMillis()
        val channel1 = produce<String> 
            delay(5000L)
        

        val channel2 = produce<String> 
            delay(100L)
            send("a")
            delay(200L)
            send("b")
            delay(200L)
            send("c")
        


        suspend fun selectChannel(
            channel1: ReceiveChannel<String>,
            channel2: ReceiveChannel<String>
        ): String = select<String> 
            channel1.onReceiveCatching 
                it.getOrNull() ?: "channel1 is closed!"
            
            channel2.onReceiveCatching 
                it.getOrNull() ?: "channel2 is closed!"
            
        

        repeat(6) 
            val result = selectChannel(channel1, channel2)
            println(result)
        

        println("Time cost: $System.currentTimeMillis() - startTime")
    

Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 584

Process finished with exit code 0

得到所有结果以后,程序不会立即退出,因为 channel1 一直在 delay()。

所以我们需要在6次repeat之后将channel关闭。

runBlocking 
        val startTime = System.currentTimeMillis()
        val channel1 = produce<String> 
            delay(15000L)
        

        val channel2 = produce<String> 
            delay(100L)
            send("a")
            delay(200L)
            send("b")
            delay(200L)
            send("c")
        


        suspend fun selectChannel(
            channel1: ReceiveChannel<String>,
            channel2: ReceiveChannel<String>
        ): String = select<String> 
            channel1.onReceiveCatching 
                it.getOrNull() ?: "channel1 is closed!"
            
            channel2.onReceiveCatching 
                it.getOrNull() ?: "channel2 is closed!"
            
        

        repeat(6) 
            val result = selectChannel(channel1, channel2)
            println(result)
        

        channel1.cancel()
        channel2.cancel()
        println("Time cost: $System.currentTimeMillis() - startTime")
    

Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 612

Process finished with exit code 0

 Deferred、Channel 的 API:


public interface Deferred : CoroutineContext.Element 
    public suspend fun join()
    public suspend fun await(): T
  
    public val onJoin: SelectClause0
    public val onAwait: SelectClause1<T>


public interface SendChannel<in E> 
    public suspend fun send(element: E)

   
    public val onSend: SelectClause2<E, SendChannel<E>>



public interface ReceiveChannel<out E> 
    public suspend fun receive(): E

    public suspend fun receiveCatching(): ChannelResult<E>
   
    public val onReceive: SelectClause1<E>
    public val onReceiveCatching: SelectClause1<ChannelResult<E>>

当 select 与 Deferred 结合使用的时候,当并行的 Deferred 比较多的时候,你往往需要在得到一个最快的结果以后,去取消其他的 Deferred。

通过 async 并发执行协程,也可以借助 select 得到最快的结果。

 runBlocking 
        suspend fun <T> fastest(vararg deferreds: Deferred<T>): T = select 
            fun cancelAll() = deferreds.forEach 
                it.cancel()
            

            for (deferred in deferreds) 
                deferred.onAwait 
                    cancelAll()
                    it
                
            
        

        val deferred1 = async 
            delay(100L)
            println("done1")
            "result1"
        

        val deferred2 = async 
            delay(200L)
            println("done2")
            "result2"
        


        val deferred3 = async 
            delay(300L)
            println("done3")
            "result3"
        


        val deferred4 = async 
            delay(400L)
            println("done4")
            "result4"
        

        val deferred5 = async 
            delay(5000L)
            println("done5")
            "result5"
        

        val fastest = fastest(deferred1, deferred2, deferred3, deferred4, deferred5)
        println(fastest)
    

Log

done1
result1

Process finished with exit code 0

 

以上是关于Kotlin协程-select基础的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程协程中的多路复用技术 ② ( select 函数原型 | SelectClauseN 事件 | 查看挂起函数是否支持 select )

Kotlin 协程协程底层实现 ① ( Kotlin 协程分层架构 | 基础设施层 | 业务框架层 | 使用 Kotlin 协程基础设施层标准库 Api 实现协程 )

Kotlin 协程协程底层实现 ① ( Kotlin 协程分层架构 | 基础设施层 | 业务框架层 | 使用 Kotlin 协程基础设施层标准库 Api 实现协程 )

Kotlin:协程范围与协程上下文

Kotlin协程基础

Kotlin协程-并发处理-基础