Kotlin学习手记——协程进阶

Posted 川峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin学习手记——协程进阶相关的知识,希望对你有一定的参考价值。

作用域:

顶级:


coroutineScope表示协同作用域,coroutineScope内部的协程出现异常可以挂掉外部协程,会向外部传播,外部协程挂掉也会挂掉子协程,即双向传播。

supervisorScope表示主从作用域,supervisorScope内部的协程挂掉不会影响外部的协程继续运行,它就像一道防火墙,隔离了异常,保证程序健壮,但是如果外部协程挂掉还是可以取消子协程的,即单向传播。




简单总结就是,主从关系:无法坑爹,爹可以坑儿子。协同关系:可以坑爹,可以坑儿子,互相坑。




如果是应用的话,主要掌握框架级别的使用即可,语言级别的支持api来源于标准库,写起来比较麻烦也非常难理解。




这里launch会进行一次调度 ,delay会进行一次调度,每次调度完成会执行一次resume, 最终协程体执行完毕会执行一次resume, 所以内部有n个挂起点的协程体会执行n+2次resume.

DEFAULT 立即开始调度 和 UNDISPATCHED 立即开始执行协程体,这两个含义的区别是 DEFAULT 只是立即启动协程执行可能是异步的,而后者是直接执行协程体中的代码了。LAZY 是先创建协程体,然后在未来的某个时刻才去启动执行。







UNDISPATCHED 立即开始执行协程体,如果遇到挂起点,就切回主流程了,后面的协程体继续执行在单独的调度器。

import kotlinx.coroutines.*


@ExperimentalCoroutinesApi
suspend fun main() 
    println("start")
    testDefaultMode()
//    testAtomicMode()
//    testLazyMode()
//    testUNDISPATCHEDMode()
    println("finish")


suspend fun testDefaultMode() 
    val defaultMode = GlobalScope.launch(start = CoroutineStart.DEFAULT) 
        println("aaa")
        delay(3000)
        println("bbb")
    
    println("222")
    defaultMode.join()


@ExperimentalCoroutinesApi
suspend fun testAtomicMode() 
    val defaultMode = GlobalScope.launch(start = CoroutineStart.ATOMIC) 
        println("aaa")
        delay(3000)
        println("bbb")
    
    println("222")
    defaultMode.join()


suspend fun testLazyMode() 
    val defaultMode = GlobalScope.async(start = CoroutineStart.LAZY) 
        println("aaa")
        delay(3000)
        println("bbb")
    
    println("222")
    defaultMode.await()


@ExperimentalCoroutinesApi
suspend fun testUNDISPATCHEDMode() 
    val defaultMode = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) 
        println("aaa")
        delay(3000)
        println("bbb")
    
    println("222")
    defaultMode.join()


Default和IO线程的区别,IO内部多了一个队列的维护



回调转协程的完整写法:

import com.bennyhuo.kotlin.coroutines.advanced.common.gitHubServiceApi
import kotlinx.coroutines.suspendCancellableCoroutine
import retrofit2.Call
import retrofit2.Callback
import retrofit2.HttpException
import retrofit2.Response
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

suspend fun <T> Call<T>.await(): T = suspendCancellableCoroutine  //可取消
    continuation ->
    continuation.invokeOnCancellation 
        cancel() //调用retrofit的取消方法
    

    enqueue(object: Callback<T> 
        override fun onFailure(call: Call<T>, t: Throwable) 
            continuation.resumeWithException(t)
        

        override fun onResponse(call: Call<T>, response: Response<T>) 
            response.takeIf  it.isSuccessful ?.body()?.also continuation.resume(it) 
                ?: continuation.resumeWithException(HttpException(response))
        

    )




suspend fun main() 
    val user = gitHubServiceApi.getUserCallback("flycumt").await()
    println(user)

也可以不自己写,retrofit的api中本身有实现await()方法,awaitResponse()方法等。

CompletableFuture 添加回调的写法:

import com.bennyhuo.kotlin.coroutines.advanced.utils.log
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

suspend fun main() 
    val result = CompletableFuture.supplyAsync 
        3
    .await()

    log(result)


suspend fun <T> CompletableFuture<T>.await(): T 
    if(isDone)
        try 
            return get()
         catch (e: ExecutionException) 
            throw e.cause ?: e
        
    
    return suspendCancellableCoroutine  //可取消
        cancellableContinuation ->
        cancellableContinuation.invokeOnCancellation 
            cancel(true) //取消
        

        whenComplete  value, throwable ->
            if(throwable == null)
                cancellableContinuation.resume(value)
             else 
                cancellableContinuation.resumeWithException(throwable.cause ?: throwable)
            
        
    

CompletableFuture本身也有实现await()方法。

模仿给Handler扩展添加可取消的支持:

suspend fun <T> Handler.run(block: () -> T) = suspendCoroutine<T>  continuation ->
    post 
        try 
            continuation.resume(block())
         catch (e: Exception) 
            continuation.resumeWithException(e)
        
    


suspend fun <T> Handler.runDelay(delay: Long, block: () -> T) = suspendCancellableCoroutine<T>  continuation ->

    val message = Message.obtain(this)  //Message obtain(Handler h, Runnable callback)
        try 
            continuation.resume(block())
         catch (e: Exception)
            continuation.resumeWithException(e)
        
    .also 
        it.obj = continuation //message.obj
    

    continuation.invokeOnCancellation 
        removeCallbacksAndMessages(continuation) //通过Handler的removeCallbacksAndMessages方法来取消回调, 参数就是前面设置的message.obj的值
    

    sendMessageDelayed(message, delay)



suspend fun main() 
    Looper.prepareMainLooper()

    GlobalScope.launch 
        val handler = Handler(Looper.getMainLooper())
        val result = handler.run  "Hello" 
        val delayedResult = handler.runDelay(5000) "World" 
        log(result, delayedResult)
        Looper.getMainLooper().quit()
    

    Looper.loop()

这个例子的主要意图是,Hanlder可以通过定义扩展函数的方式来延时获取一些东西,比如Activity刚创建的时候,拿不到view的宽和高,就可以使用这种方法。

上面三个例子主要是针对可取消的写法,如果实际用,不用自己写,直接导库就行。



其中CONFLATED比较适合用于状态更新,比如进度条的进度,因为它总是只取最新的。



关闭后再发送会抛异常:

channel关闭后,channel中的数据仍然可以被接受,只有当channel中的数据消费完了,isClosedForReceive才为true.

suspend fun main() 
    basics()


suspend fun basics() 
    val channel = Channel<Int>(Channel.RENDEZVOUS)
//    val channel = Channel<Int>(Channel.UNLIMITED)
//    val channel = Channel<Int>(Channel.CONFLATED)
//    val channel = Channel<Int>(Channel.BUFFERED)
//    val channel = Channel<Int>(1)

    //生产者 发
    val producer = GlobalScope.launch 
        for (i in 0..3) 
            log("sending", i)
            channel.send(i)
            log("sent", i)
        
        channel.close()
    

    //消费者 收
    val consumer = GlobalScope.launch 
        while (!channel.isClosedForReceive) 
            log("receiving")
            val value = channel.receiveOrNull()
            log("received", value)
        
    

    producer.join()
    consumer.join()


Channel(Channel.RENDEZVOUS ) 的方式是发一个收一个,边发边收,如果没有接受的,发送者会挂起等待,输出如下:

Channel(Channel.UNLIMITED ) 的方式是全部发送完毕,才会接收到,先发后收,发送者发送完就返回了,不管有没有接受者,输出如下:
Channel(Channel.CONFLATED ) 的方式是不管发了多少个,只能收到最后一个,也是发送完就返回了,不管有没有接受者,输出如下:

Channel(Channel.BUFFERED ) 的方式也是发送者发送完就返回了,不管有没有接受者,可以指定buffer大小,输出如下:

Channel(1) 的方式指定管道的容量大小,如果数据超过容量,发送者就会挂起等待,直到有接受者取走数据,发送者才发送下一批数据,


channel接受数据的时候可以直接当成迭代器使用:

suspend fun iterateChannel() 
    val channel = Channel<Int>(Channel.UNLIMITED)

    val producer = GlobalScope.launch 
        for (i in 0..3) 
            log("sending", i)
            channel.send(i)
            log("sent", i)
        
        channel.close()
    

    val consumer = GlobalScope.launch 
        for (i in channel) 
            log("received: ", i)
        
    

    producer.join()
    consumer.join()

suspend fun producer() 
    val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED) 
        for (i in 0..3) 
            log("sending", i)
            send(i)
            log("sent", i)
        
    

    val consumer = GlobalScope.launch 
        for (i in receiveChannel) 
            log("received: ", i)
        
    

    consumer.join()


suspend fun consumer() 
    val sendChannel = GlobalScope.actor<Int>(capacity = Channel.UNLIMITED) 
        for (i in this) 
            log("received: ", i)
        
    

    val producer = GlobalScope.launch 
        for (i in 0..3) 
            log("sending", i)
            sendChannel.send(i)
            log("sent", i)
        
    

    producer.join()


suspend fun broadcast() 
    //下面几种都可以创建一个BroadcastChannel
    //val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
    //val broadcastChannel = Channel<Int>(Channel.BUFFERED).broadcast()
    val broadcastChannel = GlobalScope.broadcast 
        for (i in 0..5) 
            send(i)
        
    

    //启动5个接受者,每个都能收到
    List(5)  index ->
        GlobalScope.launch 
            val receiveChannel = broadcastChannel.openSubscription()
            for (i in receiveChannel) 
                log("[#$index] received: $i")
            
        
    .joinAll()

输出:

> Task :ChannelsKt.main()
21:07:12:924 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 0
21:07:12:924 [DefaultDispatcher-worker-5 @coroutine#6] [#4] received: 0
21:07:12:924 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 0
21:07:12:925 [DefaultDispatcher-worker-4 @coroutine#5] [#3] received: 0
21:07:12:925 [DefaultDispatcher-worker-2 @coroutine#4] [#2] received: 0
21:07:12:944 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 1
21:07:12:943 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 1
21:07:12:943 [DefaultDispatcher-worker-5 @coroutine#6] [#4] received: Kotlin协程入门指南+进阶实战,Kotlin语言真的太香了

kotlin 协程万字协程 一篇完成kotlin 协程进阶

Kotlin学习手记——Json解析

史上最详Android版kotlin协程入门进阶实战

Kotlin学习手记——反射

史上最详Android版kotlin协程入门进阶实战