Kotlin协程的迷惑

Posted zuguorui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程的迷惑相关的知识,希望对你有一定的参考价值。

Kotlin作为谷歌强力推广的android编程语言,是运行在jvm上的,在很多地方可以看做java语言披了一层语法糖,很多地方其实和java是相通的,包括各种库的使用。不过倒是有一个新东西:协程,这个对于java程序员来说可能就比较陌生。我也是研究了一下协程,作为一个备忘。

文章目录

1. 关键概念

1.1什么是协程?

协程是轻量级的线程

这是官方文档给的定义。虽然它和线程的关系看起来很像线程和进程的关系,但还是有很大不同的。最关键的一点在于,线程和进程在cpu允许的情况下,是真正的并行。而协程的允许范围仅仅在一个线程里,无论有多少个协程,在同一个线程内无论何时都只有一个协程的任务在进行。
那么为什么又要搞出协程这么个东西呢?它和普通的串行代码有什么不同吗?这里就要明确一个点,那就是我们有些代码其实在运行的时候并不是完全占满cpu的,通常的比如文件读写、网络活动等,在等待磁盘或者网络响应的时候,cpu其实是空闲的,但相关方法仍然在阻塞我们的线程。这个时候,协程就派上用场了,它在cpu空闲的时候转而去运行其他消耗cpu资源的代码,使得cpu以更加饱满的负荷去运行。这一点在官方代码中写得其实很迷惑,因为它用delay()这样一个非阻塞的方法来假设一段有用的代码,导致你会觉得仿佛协程真的在同步运行一样。

1.2 第一个协程代码

首先要添加coroutines依赖,它属于kotlinx包,在kotlin的sdk中它是自带的,idea在并不会默认包含。它和kotlin包都在同一个位置,打开项目依赖把它添加进去就好。

点击+

在这你就能找到kotlinx的包了。

import kotlinx.coroutines.*
fun main()


    GlobalScope.launch 
        for(i in 0..10)
        
            delay(200)
            println("job1: $i")
        
    

    GlobalScope.launch 
        for(i in 0..10)
        
            delay(200)
            println("job2: $i")
        
    

    Thread.sleep(3000)
    println("main func finish")

运行之后,代码就会交替打印job1和job2的内容。
那如果把Thread.sleep(3000)改成1000呢?可以看到launch块内的代码根本没有运行完。
先简要地介绍一下,GlobalScope其实是指定了另外一个线程在运行这个代码,所以两个launch块内的内容并不是和main在同一个线程,所以它们不会阻塞main方法,但是GlobalScope启动的协程的生命周期依赖于整个应用程序,当整个应用程序后,协程也会被取消掉,无论它是否运行完毕。
我们可以加个打印来验证一下:

fun main()


    GlobalScope.launch 

        for(i in 0..10)
        
            delay(200)
            println("job1: $i")
        
        println("job1 current thread is $Thread.currentThread().name")
    

    GlobalScope.launch 
        for(i in 0..10)
        
            delay(200)
            println("job2: $i")
        
        println("job2 current thread is $Thread.currentThread().name")
    

    Thread.sleep(3000)
    println("main func finish")
    println("main current thread is $Thread.currentThread().name")

最后的输出

...
job2: 10
job1: 10
job2 current thread is DefaultDispatcher-worker-14
job1 current thread is DefaultDispatcher-worker-13
main func finish
main current thread is main

很显然的,不在同一个线程里。
需要注意一点的是,并不是说启动协程的方法结束了,协程也会随之结束,比如:

fun doJob()

    GlobalScope.launch 

        for(i in 0..10)
        
            delay(200)
            println("job1: $i")
        
        println("job1 current thread is $Thread.currentThread().name")
    

    GlobalScope.launch 
        for(i in 0..10)
        
            delay(200)
            println("job2: $i")
        
        println("job2 current thread is $Thread.currentThread().name")
    




fun main()


    doJob()
    println("doJob finished")
    Thread.sleep(3000)
    println("main func finish")
    println("main current thread is $Thread.currentThread().name")

打印如下:

doJob finished
job2: 0
job1: 0
job1: 1
job2: 1
job2: 2
job1: 2
job2: 3
job1: 3
job1: 4
job2: 4
job1: 5
job2: 5
job1: 6
job2: 6
job2: 7
job1: 7
job2: 8
job1: 8
job2: 9
job1: 9
job2: 10
job1: 10
job2 current thread is DefaultDispatcher-worker-15
job1 current thread is DefaultDispatcher-worker-7
main func finish
main current thread is main

虽然doJob方法已经退出了,但是它启动的协程并没有结束。

1.3 runBlocking

上面讲了如何启动一个协程,GlobalScope是将协程放在应用生命周期中。但我们不能总用ThreadSleep去等待协程完毕吧。runBlocking和GlobalScope一样,定义了一个线程运行的作用域,不过它的作用域仅在当前线程当前方法中,它会阻塞线程,并等待其中启动的协程全部运行完毕才会退出。比如我们将上面那个例子的doJob函数改一下,并且把main中的Thread.sleep去掉

fun doJob()

    runBlocking 
        launch 

            for(i in 0..10)
            
                delay(200)
                println("job1: $i")
            
            println("job1 current thread is $Thread.currentThread().name")
        

        launch 
            for(i in 0..10)
            
                delay(200)
                println("job2: $i")
            
            println("job2 current thread is $Thread.currentThread().name")
        
    

运行后打印如下

...
job1: 10
job1 current thread is main
job2: 10
job2 current thread is main
doJob finished
main func finish
main current thread is main

可以看到,直到doJob启动的两个协程都运行完,doJob方法才退出,而且main中也不用Thread.sleep去等待协程了。

1.4 coroutineScope

与runBlocking非常相似,它们都会等待自己启动的协程运行完,但不同的是runBlocking会阻塞线程但是coroutineScope不会。这一点可能不太明显,必须对比起来才能理解,而官方那个例子其实非常模糊,甚至于并没有突出这个差别。这里我自己写一下代码。
首先修改doJob,我们使用runBlocking

fun doJob()

    runBlocking 

        launch 
            println("job1 start")
            for(i in 0..5)
            
                delay(200)
                println("job1: $i")
            
            println("job1 current thread is $Thread.currentThread().name")
        

        delay(200)//为了保证job1能先运行
        runBlocking 
            launch 
                println("job2 start")
                for(i in 0..5)
                
                    delay(200)
                    println("job2: $i")
                
                println("job2 current thread is $Thread.currentThread().name")
            


        
    


fun main()


    doJob()
    println("doJob finished")
    println("main func finish")
    println("main current thread is $Thread.currentThread().name")

运行,它的打印如下

job1 start
job2 start
job2: 0
job2: 1
job2: 2
job2: 3
job2: 4
job2: 5
job2 current thread is main
job1: 0
job1: 1
job1: 2
job1: 3
job1: 4
job1: 5
job1 current thread is main
doJob finished
main func finish
main current thread is main

我在第一层runBlocking里面新建了一个runBlocking。可以看到,当job1在运行后打印第一句后进入for循环的第一个delay 200ms。此时cpu空闲,转而去运行job2。而runBlocking是阻塞线程的,既然阻塞线程,那依附于线程的协程肯定会被阻塞了。于是乎仅当job2全部运行完后,job1才能接着运行。而第二层runBlocking前的delay是为了保证job1能先运行,可以更明显地看到现象,如果去掉后,则完全就是job2先运行,job1后运行了。

如果把第二层runBlocking换成coroutineScope

fun doJob()

    runBlocking 

        launch 
            println("job1 start")
            for(i in 0..5)
            
                delay(200)
                println("job1: $i")
            
            println("job1 current thread is $Thread.currentThread().name")
        

        delay(200)
        coroutineScope 
            launch 

                println("job2 start")
                for(i in 0..5)
                
                    delay(200)
                    println("job2: $i")
                
                println("job2 current thread is $Thread.currentThread().name")
            


        
    

输出

job1 start
job2 start
job1: 0
job2: 0
job1: 1
job2: 1
job1: 2
job2: 2
job1: 3
job2: 3
job1: 4
job2: 4
job1: 5
job1 current thread is main
job2: 5
job2 current thread is main
doJob finished
main func finish
main current thread is main

这次两个协程能够交替运行了。也就是coroutineScope并不会阻塞线程,在该线程的其他协程都有机会运行。

如果换个顺序

fun doJob()

    runBlocking 

        coroutineScope 
            launch 

                println("job2 start")
                for(i in 0..5)
                
                    delay(200)
                    println("job2: $i")
                
                println("job2 current thread is $Thread.currentThread().name")
            


        

        launch 
            println("job1 start")
            for(i in 0..5)
            
                delay(200)
                println("job1: $i")
            
            println("job1 current thread is $Thread.currentThread().name")
        



    

打印

job2 start
job2: 0
job2: 1
job2: 2
job2: 3
job2: 4
job2: 5
job2 current thread is main
job1 start
job1: 0
job1: 1
job1: 2
job1: 3
job1: 4
job1: 5
job1 current thread is main
doJob finished
main func finish
main current thread is main

看起来是job2先运行,是因为coroutineScope阻塞了线程吗?并不是,这是因为coroutineScope和runBlocking一样,都会等待自己启动的协程运行完毕才会退出。所以coroutineScope在前面运行会阻塞协程,直到运行完毕后,job1协程才真正开始建立,之后再运行。

在理解了这些后,剩下的一些概念比较简单。比如协程上下文调度器以及管道等,官方的例子也足够。这里就不重复了。需要的戳官方文档

2 协程?线程?

协程看起来貌似真的和线程差不多,那到底要怎么选呢?

2.1 有用的工作

不知道你是不是会疑惑上面的代码中频繁出现的delay函数。在官方文档中,它被用来假设为有用的工作来证明协程的高效性。然而事实真的如此?我们假设两个有用的工作。

首先是假设有个大量消耗cpu的计算型任务。首先使用协程来“并行”运行一次。

suspend fun doJob(count: Int, array: IntArray)

    var index = 0
    var i = 0
    while(i++ < count)//仅仅跑循环来消耗cpu。
    
        var j = 0
        while(j++ < count)
        
            index = index.rem(array.size)
            array[index++] = i + j
        
    


fun main()

    var array1 = IntArray(100)
    var array2 = IntArray(100)
	//用来计算runBlocking从进入到退出所占用的时间,别忘了runBlocking是会阻塞线程的,它会等待所有启动的协程都运行完才退出
    var time = measureTimeMillis 
        runBlocking 
			//启动两个协程开始任务
            launch 
                doJob(10000, array1)
            
            launch 
                doJob(10000, array2)
            

        
    
    println("time used $time")

输出结果是1580左右。每次运行会有微小偏差。
然后看串行运行的,修改main函数

fun main()

    var array1 = IntArray(100)
    var array2 = IntArray(100)

    var time = measureTimeMillis 
        runBlocking 
			//仅启动一个协程,单个协程里的代码是串行的。
            launch 
                doJob(10000, array1)
                doJob(10000, array2)
            
        
    
    println("time used $time")

打印出来仍然是1580左右。

现在让我们把这个任务修改一下,在doJob中加入delay函数。首先仍是并行的。

suspend fun doJob(count: Int, array: IntArray)

    var index = 0
    var i = 0
    while(i++ < count)
    
        var j = 0
        while(j++ < count)
        
            delay(4)//注意这块加了4ms延时
            index = index.rem(array.size)
            array[index++] = i + j
        
    


fun main()

    var array1 = IntArray(100)
    var array2 = IntArray(100)

    var time = measureTimeMillis 
        runBlocking 
			//启动两个协程,为了避免运行太长时间,循环次数少一些
            launch 
                doJob(20, array1)
            
            launch 
                doJob(20, array2)
            

        
    
    println("time used $time")


最终耗时6000ms左右。
然后看下串行的。修改main

fun main()

    var array1 = IntArray(100)
    var array2 = IntArray(100)

    var time = measureTimeMillis 
        runBlocking 
			//启动一个协程,单个协程里面的代码是串行的
            launch 
                doJob(20, array1)
                doJob(20, array2)
            
        
    
    println("time used $time")

最终耗时11800ms左右。

这下果然就减少了差不多一半运行时间。
所以基本就能得出一个结论,协程是否高效,关键在于你的代码中会不会有可能因为等待其他任务而导致cpu空闲,却又阻塞线程的任务。如果你的代码本来就会让cpu满负荷跑,那协程根本没用,反而会因为要建立协程上下文等操作更加耗时。但是如果某些代码,比如你在等待接收消息,如果这个消息传送并不会主动触发接受者,那你一般就需要单独开启一个线程去监视,而这个操作通常都是阻塞的,因为如果使用死循环去一直查询会导致cpu非常忙而影响到其他任务。这个时候协程就非常合适了。

那平时的业务场景,比如网络连接或者文件读写之类的呢?很遗憾,经过我的测试,它们仍然都是线程堵塞的,也就是协程完全不起作用。 使用文件写入进行测试,协程没有节省时间,这可能因为文件写入也挺快的,导致cpu其实没有多少空余时间。我又用okhttp进行网络测试,也没有节省时间,甚至于我直接指定了两个“不存在的”网址,直接让它超时,最终的结果也表明它们阻塞了线程,协程完全没有作用。除非有一天它们也都成为协程所要求的可挂起函数。

目前看来,协程比较理想的应用场景就是应用内的消息通道,亦或者作为生产者-消费者模型的实现。毕竟通道和流还是很符合需求的。

然而理想很美好,现实很骨感。

2.2 协程用于生产者-消费者模式的限制

这种场景下,官方推荐使用流和管道来实现。但是同一时间仅有生产者和消费者在运行,那么这和使用一个变量来承接产品,进行串行调用有什么区别呢?更何况如果生产者和消费者都需要花费大量的时间,鉴于目前无论文件或者网络连接,java内的库api都是线程阻塞的,亦或者你的任务根本就是一个cpu密集型的,那协程毫无意义。相反还会带来额外的开销。如果你使用诸如Dispatchers.Default或者Dispatchers.Unconfined这样可能或造成线程跳转的调度器,甚至于你可能一开始就指定了一个新线程的作用域,那你仍然无法逃避数据同步问题,更何况协程又极大地拔高了代码的复杂度。

2.3 管道和流用于消息传递的限制

本质上消息传递就是轻量级的生产者和消费者模式。如果使用Flow,代码结构会变得很奇怪,因为无论如何主体应该是核心线程,消息通知器仅仅是一个工具而已。使用Flow使得你不得不将产生消息的代码放在Flow中,并且最终你仅能针对这一个Flow发送它想要的消息。

这么一看貌似Channel是最合适的了。实际上对于Channel来说,单对单的消息传递没有问题。但是如果单对多,想要所有客户端都收到同一条消息是做不到的。并且如果在跨线程传递时,如果有个通道,需要的时候打开它,不需要的时候关闭以通知客户端消息结束(我不得不吐槽官方文档的诸多假设,delay就不说了,多数流和管道居然会假设客户端事先知道需要多少个消息?),实际上为了保证channel的状态不出问题,比如客户端在生产端已经关闭Channel的情况下去receive(这会导致异常,而且抛出的异常位置居然不是客户端的receive方法而是生产端的channel.close方法),你仍然不得不使用线程中那丑陋的同步方式。另外由于协程本身的send和receive方法相当于用挂起代替了传统方式中的阻塞,非常容易造成在锁定Channel的过程中无法发送或接受数据而挂起,另一方却因为无法拿到锁而无法接受或发送数据,进而导致死锁。

下面有个例子

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*


var mChannel: Channel<Int> = Channel<Int>()

class ConsumerThread(val threadID: Int, val channel: Channel<Int>): Thread()


    override fun run() 
        runBlocking 
            launch 
                println("consumer$threadID: run in thread $Thread.currentThread().name")
                while(true)
                
                    var b: Boolean = true

                    synchronized(channel)
                    
                        if(channel.isClosedForSend)
                        
                            b = false
                        
                        println("consumer$threadID: channel.isClosedForSend = $channel.isClosedForSend")
                        println("consumer$threadID: channel.isClosedForReceive = $channel.isClosedForReceive")
                    
                    if(!b)
                    
                        break
                    

                    var i = channel.receive()
                    channel.receiveOrNull()
                    println("consumer$threadID: received $i")
                

                println("consumer$threadID: channel was closed")
            

            launch 
                for (i in 0..20)
                
                    delay(200)
                    println("consumer$threadID: loop $i")
                
            
        
    


class ProducerThread(val threadID: Int, val channel: Channel<Int>): Thread()

    override fun run() 
        runBlocking 
            launch 
                println("producer$threadID: run in thread $Thread.currentThread().name")
//                    var i = 0

                for(i 深入理解Kotlin协程协程的分类与线程的区别

深潜Koltin协程:协程的取消

Kotlin 协程的作用域构建器 coroutineScope与runBlocking 与supervisorScope,协程同步运行,协程挂掉的时候其他协程如何不被挂掉。

谈谈我对 Kotlin 中协程的理解

kotlin协程的理解

kotlin协程的生命周期与jetpack组件绑定