[Kotlin Tutorials 22] 协程中的异常处理

Posted 圣骑士Wind的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Kotlin Tutorials 22] 协程中的异常处理相关的知识,希望对你有一定的参考价值。

Kotlin协程中的异常处理讨论

协程中的异常处理

Parent-Child关系

如果一个coroutine抛出了异常, 它将会把这个exception向上抛给它的parent, 它的parent会做以下三件事情:

  • 取消其他所有的children.
  • 取消自己.
  • 把exception继续向上传递.

这是默认的异常处理关系, 取消是双向的, child会取消parent, parent会取消所有child.

catch不住的exception

看这个代码片段:

fun main() 
    val scope = CoroutineScope(Job())
    try 
        scope.launch 
            throw RuntimeException()
        
     catch (e: Exception) 
        println("Caught: $e")
    

    Thread.sleep(100)

这里的异常catch不住了.
会直接让main函数的主进程崩掉.

这是因为和普通的异常处理机制不同, coroutine中未被处理的异常并不是直接抛出, 而是按照job hierarchy向上传递给parent.

如果把try放在launch里面还行.

默认的异常处理

默认情况下, child发生异常, parent和其他child也会被取消.

fun main() 
    println("start")
    val exceptionHandler = CoroutineExceptionHandler  _, exception ->
        println("CoroutineExceptionHandler got $exception")
    
    val scope = CoroutineScope(Job() + exceptionHandler)

    scope.launch 
        println("child 1")
        delay(1000)
        println("finish child 1")
    .invokeOnCompletion  throwable ->
        if (throwable is CancellationException) 
            println("Coroutine 1 got cancelled!")
        
    

    scope.launch 
        println("child 2")
        delay(100)
        println("child 2 throws exception")
        throw RuntimeException()
    

    Thread.sleep(2000)
    println("end")

打印出:

start
child 1
child 2
child 2 throws exception
Coroutine 1 got cancelled!
CoroutineExceptionHandler got java.lang.RuntimeException
end

SupervisorJob

如果有一些情形, 开启了多个child job, 但是却不想因为其中一个的失败而取消其他, 怎么办? 用SupervisorJob.

比如:

val uiScope = CoroutineScope(SupervisorJob())

如果你用的是scope builder, 那么用supervisorScope.

SupervisorJob改造上面的例子:

fun main() 
    println("start")
    val exceptionHandler = CoroutineExceptionHandler  _, exception ->
        println("CoroutineExceptionHandler got $exception")
    
    val scope = CoroutineScope(SupervisorJob() + exceptionHandler)

    scope.launch 
        println("child 1")
        delay(1000)
        println("finish child 1")
    .invokeOnCompletion  throwable ->
        if (throwable is CancellationException) 
            println("Coroutine 1 got cancelled!")
        
    

    scope.launch 
        println("child 2")
        delay(100)
        println("child 2 throws exception")
        throw RuntimeException()
    
    Thread.sleep(2000)
    println("end")

输出:

start
child 1
child 2
child 2 throws exception
CoroutineExceptionHandler got java.lang.RuntimeException
finish child 1
end

尽管coroutine 2抛出了异常, 另一个coroutine还是做完了自己的工作.

SupervisorJob的特点

SupervisorJob把取消变成了单向的, 只能从上到下传递, 只能parent取消child, 反之不能取消.
这样既顾及到了由于生命周期的结束而需要的正常取消, 又避免了由于单个的child失败而取消所有.

viewModelScope的context就是用了SupervisorJob() + Dispatchers.Main.immediate.

除了把取消变为单向的, supervisorScope也会和coroutineScope一样等待所有child执行结束.

supervisorScope中直接启动的coroutine是顶级coroutine.
顶级coroutine的特性:

  • 可以加exception handler.
  • 自己处理exception.
    比如上面的例子中coroutine child 2可以直接加exception handler.

使用注意事项, SupervisorJob只有两种写法:

  • 作为CoroutineScope的参数传入: CoroutineScope(SupervisorJob()).
  • 使用supervisorScope方法.

把Job作为coroutine builder(比如launch)的参数传入是错误的做法, 不起作用, 因为一个新的coroutine总会assign一个新的Job.

异常处理的办法

try-catch

和普通的异常处理一样, 我们可以用try-catch, 只是注意要在coroutine里面:

fun main() 
    val scope = CoroutineScope(Job())
    scope.launch 
        try 
            throw RuntimeException()
         catch (e: Exception) 
            println("Caught: $e")
        
    

    Thread.sleep(100)

这样就能打印出:

Caught: java.lang.RuntimeException

对于launch, try要包住整块.
对于async, try要包住await语句.

scope function: coroutineScope()

coroutineScope会把其中未处理的exception抛出来.

相比较于这段代码中catch不到的exception:

fun main() 
    val scope = CoroutineScope(Job())
    scope.launch 
        try 
            launch 
                throw RuntimeException()
            
         catch (e: Exception) 
            println("Caught: $e")
        
    
    Thread.sleep(100)

没走到catch里, 仍然是主进程崩溃.

这个exception是可以catch到的:

fun main() 
    val scope = CoroutineScope(Job())
    scope.launch 
        try 
            coroutineScope 
                launch 
                    throw RuntimeException()
                
            
         catch (e: Exception) 
            println("Caught: $e")
        
    

    Thread.sleep(100)

打印出:

Caught: java.lang.RuntimeException

因为这里coroutineScope把异常又重新抛出来了.

注意这里换成supervisorScope可是不行的.

CoroutineExceptionHandler

CoroutineExceptionHandler是异常处理的最后一个机制, 此时coroutine已经结束了, 在这里的处理通常是报告log, 展示错误等.
如果不加exception handler那么unhandled exception会进一步往外抛, 如果最后都没人处理, 那么可能造成进程崩溃.

CoroutineExceptionHandler需要加在root coroutine上.

这是因为child coroutines会把异常处理代理到它们的parent, 后者继续代理到自己的parent, 一直到root.
所以对于非root的coroutine来说, 即便指定了CoroutineExceptionHandler也没有用, 因为异常不会传到它.

两个例外:

  • async的异常在Deferred对象中, CoroutineExceptionHandler也没有任何作用.
  • supervision scope下的coroutine不会向上传递exception, 所以CoroutineExceptionHandler不用加在root上, 每个coroutine都可以加, 单独处理.

通过这个例子可以看出另一个特性: CoroutineExceptionHandler只有当所有child都结束之后才会处理异常信息.

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking 
    val handler = CoroutineExceptionHandler  _, exception -> 
        println("CoroutineExceptionHandler got $exception") 
    
    val job = GlobalScope.launch(handler) 
        launch  // the first child
            try 
                delay(Long.MAX_VALUE)
             finally 
                withContext(NonCancellable) 
                    println("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    println("The first child finished its non cancellable block")
                
            
        
        launch  // the second child
            delay(10)
            println("Second child throws an exception")
            throw ArithmeticException()
        
    
    job.join()

输出:

Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
CoroutineExceptionHandler got java.lang.ArithmeticException

如果多个child都抛出异常, 只有第一个被handler处理, 其他都在exception.suppressed字段里.

fun main() = runBlocking 
    val handler = CoroutineExceptionHandler  _, exception ->
        println("CoroutineExceptionHandler got $exception with suppressed $exception.suppressed.contentToString()")
    
    val job = GlobalScope.launch(handler) 
        launch 
            try 
                delay(Long.MAX_VALUE) // it gets cancelled when another sibling fails with IOException
             finally 
                throw ArithmeticException() // the second exception
            
        
        launch 
            delay(100)
            throw IOException() // the first exception
        
        delay(Long.MAX_VALUE)
    
    job.join()

输出:

CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]

单独说一下async

async比较特殊:

  • 作为top coroutine时, 在await的时候try-catch异常.
  • 如果是非top coroutine, async块里的异常会被立即抛出.

例子:

fun main() 
    val scope = CoroutineScope(SupervisorJob())
    val deferred = scope.async 
        throw RuntimeException("RuntimeException in async coroutine")
    

    scope.launch 
        try 
            deferred.await()
         catch (e: Exception) 
            println("Caught: $e")
        
    

    Thread.sleep(100)

这里由于用了SupervisorJob, 所以async是top coroutine.

fun main() 

    val coroutineExceptionHandler = CoroutineExceptionHandler  coroutineContext, exception ->
        println("Handle $exception in CoroutineExceptionHandler")
    

    val topLevelScope = CoroutineScope(SupervisorJob() + coroutineExceptionHandler)
    topLevelScope.launch 
        async 
            throw RuntimeException("RuntimeException in async coroutine")
        
    
    Thread.sleep(100)

当它不是top coroutine时, 异常会被直接抛出.

特殊的CancellationException

CancellationException是特殊的exception, 会被异常处理机制忽略, 即便抛出也不会向上传递, 所以不会取消它的parent.
但是CancellationException不能被catch, 如果它不被抛出, 其实协程没有被成功cancel, 还会继续执行.

CancellationException的透明特性:
如果CancellationException是由内部的其他异常引起的, 它会向上传递, 并且把原始的那个异常传递上去.

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking 
    val handler = CoroutineExceptionHandler  _, exception ->
        println("CoroutineExceptionHandler got $exception")
    
    val job = GlobalScope.launch(handler) 
        val inner = launch  // all this stack of coroutines will get cancelled
            launch 
                launch 
                    throw IOException() // the original exception
                
            
        
        try 
            inner.join()
         catch (e: CancellationException) 
            println("Rethrowing CancellationException with original cause")
            throw e // cancellation exception is rethrown, yet the original IOException gets to the handler
        
    
    job.join()

输出:

Rethrowing CancellationException with original cause
CoroutineExceptionHandler got java.io.IOException

这里Handler拿到的是最原始的IOException.

Further Reading

官方文档:

Android官方文档上链接的博客和视频:

其他:

深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题

热数据通道 Channel

Channel 实际上就是 个并发安全的队列,它可以用来连接协程,实现不同协程的通信,代码如代码清单所示

suspend fun testChannel() 
    val channel = Channel<Int>()  
    var i = 0
    //生产者 发
    val producer = GlobalScope.launch 
        while (true) 
            delay(1000)
            channel.send(i++)
        
    
    //消费者 收
    val consumer = GlobalScope.launch 
        while (true) 
            val value = channel.receive()
            println("received <<<<<<<<<<<<<<<<<< $value")
        
    
    producer.join()
    consumer.join()

上述代码 构造了两个协程 producer 和 consumer, 没有为它们明确指定调度器,所以它们都是采用默认调度器,在 Java 平台上就是基于线程池实现的 Default。 它们可以运行在不同的线程上,也可以运行在同一个线程上,具体执行流程如图 6-2 所示。

producer 每隔 1s 向 Channel 发送 1 个数,consumer 一直读取 channel 来获取这个数字并打印,显然发送端比接收端更慢,在没有值可以读到的时候, receive 是挂起的,直到有新元素到达。

这么看来,receive 一定是一个挂起函数,那么 send 呢?

你会发现 send 也是挂起函数。发送端为什么会挂起?以我们熟知的 BlockingQueue 为例,当我们往其中添加元素的时候,元素在队列里实际上是占用了空间的,如果这个队列空间不足,那么再往其中添加元素的时候就会出现两种情况:

  • 阻塞.等待队列腾出空间
  • 异常,拒绝添加元素。

send 也会面临同样的问题, Channel 实际上就是个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用 receive 并取走元素, send 就需要挂起,等待接收者取走数据之后再写入 Channel 。

Channel缓冲区

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) 
        RENDEZVOUS -> 
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
            else
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
        
        CONFLATED -> 
            require(onBufferOverflow == BufferOverflow.SUSPEND) 
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            
            ConflatedChannel(onUndeliveredElement)
        
        UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
        BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
            if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
            onBufferOverflow, onUndeliveredElement
        )
        else -> 
            if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
                ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
            else
                ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
        
    

我们构造 Channel 的时候调用了一个名为 Channel 的函数,虽然两个 “Channel" 起来是 样的,但它却确实不是 Channel 的构造函数。在 Kotlin 中我们经常定义 一个顶级函数来伪装成同名类型的构造器,这本质上就是工厂函数。Channel 函数有一个参数叫 capacity, 该参数用于指定缓冲区的容量,RENDEZVOUS 默认值为 0,RENDEZVOUS 本意就是描述“不见不散"的场景, 如果不调用 receive, send 就会一直挂起等待。如果把上面代码中consumer的channel.receive()注释掉,则producer中send方法第一次调用就会挂起。

  • Channel(Channel.RENDEZVOUS ) 的方式是有人接收才会继续发,边收边发,如果没有接受的,则发送者会挂起等待
  • Channel(Channel.UNLIMITED ) 的方式是发送者发送完毕,就直接返回,不管有没有接受者。
  • Channel(Channel.CONFLATED ) 的方式是不管发送者发了多少个,接受者只能收到最后一个,也是发送完就返回了,不管有没有接受者。
  • Channel(Channel.BUFFERED ) 的方式也是发送者发送完就返回了,不管有没有接受者,可以指定buffer大小。
  • Channel(1) 的方式指定管道的容量大小,如果数据超过容量,发送者就会挂起等待,直到有接受者取走数据,发送者才发送下一批数据

Channel的迭代

Channel可以通过迭代器迭代访问:

GlobalScope.launch 
    val iterator = channel.iterator()
    while (iterator.hasNext())  // 挂起点
          println("received <<<<<<<<<<<<<<<<<< $iterator.next()")
    

其中,iterator.hasNext()挂起函数,在判断是否有下个元素的时候就需要去Channel 中读取元素了,这个写法自然可以简化成 for-in

GlobalScope.launch 
    for (element in channel)  
          println("received <<<<<<<<<<<<<<<<<< $element")
    

生产者和消费者协程构造器

我们可以通过 produce 方法启动一个生产者协程,并返回 ReceiveChannel,其他协程就可以用这个 Channel 来接收数据了。反过来,我们可以用 actor 启动一个消费者协程。

suspend fun producer() 
    val receiveChannel = GlobalScope.produce 
        for (i in 0..3) 
            send(i)
            println("send --------------> $i")
        
    

    val consumer = GlobalScope.launch 
        for (i in receiveChannel) 
            println("received <<<<<<<<<<<<<<<< $i")
        
    

    consumer.join()

suspend fun consumer() 
    val sendChannel = GlobalScope.actor<Int> 
        for (i in this) 
            println("received <<<<<<<<<<<<<<<< $i")
        
    

    val producer = GlobalScope.launch 
        for (i in 0..3) 
            sendChannel.send(i)
            println("send --------------> $i")
        
    

    producer.join()
 

使用这两种构造器也可以指定Channel对应的缓冲区类型,如:

val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED) 
    for (i in 0..3) 
        send(i)
    

ReceiveChannel SendChannel 都是 Channel 的父接口,前者定义了 receive, 后者定义了 send, Channel 也因此既可以使用 receive 又可以使用 send。
通过 produce 和 actor 这两个协程构造器启动的协程也与返回的 Channel 自然地绑定到了一起,因此在协程结束时返回的 Channel 也会被立即关闭。

produce 为例,它构造出了一个 ProducerCoroutine 对象,该对象也是 Job 的实现:

private class ProducerCoroutine<E>(
    parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> 
    override val isActive: Boolean
        get() = super.isActive

    override fun onCompleted(value: Unit) 
        _channel.close() // 协程完成时关闭channel
    

    override fun onCancelled(cause: Throwable, handled: Boolean) 
        val processed = _channel.close(cause) // 协程取消时关闭channel
        if (!processed && !handled) handleCoroutineException(context, cause)
    

注意,在协程完成和取消的方法调用 中, 对应的_channel 都会被关闭。produc actor 这两个构造器看上去都很有用,不过目前前者仍被标记为 Experimental CoroutinesApi, 后者则被标记为 ObsoleteCoroutinesApi, 后续仍然可能会有较大的改动。

Channel的关闭

对千一个 Channel 如果我们调用了它的 close() 方法,它会立即停止接收新元素,也就是说这时候它的 isClosedForSend 会立即返回 true 而由于 Channel 缓冲区的存在, 这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true

一说到关闭,我们很容易想到 I/0, 如果不关闭 1/0 可能会造成资源泄露。那么 Channel 关闭有什么意义呢?前面我们提到过,Channel 内部的资源其实就是个缓冲区,如果我们创建 Channel 而不去关闭它。虽然并不会造成系统资源的泄露,但却会让接收端一直处千挂起等待的状态,因此一定要在适当的时机关闭 Channel。

究竟由谁来关闭Channel,需要根据业务场景由发送端和接受端之间进行协商决定。如果发送端关闭了Channel,接受端还在调用receive方法,会导致异常,这时就需要进行异常处理:

suspend fun testChannel2() 
    val channel = Channel<Int>()
    //生产者 发
    val producer = GlobalScope.launch 
        for (i in 0..3) 
            println("sending --------------> $i")
            channel.send(i)
        
        channel.close() // 发送端关闭channel
    
    //消费者 收
    val consumer = GlobalScope.launch 
        try 
            while (true) 
                val value = channel.receive()
//                 val value = channel.receiveCatching() // 这个方法不会抛出异常
                println("received <<<<<<<<<<<<<<<<<< $value")
            
         catch (e : ClosedReceiveChannelException) 
            println("catch ClosedReceiveChannelException: $e.message")
        
    
    producer.join()
    consumer.join()

发送端关闭了Channel,接受端还在调用receive方法,会抛出ClosedReceiveChannelException异常,如果使用receiveCatching()遇到close时就不会抛出异常,但是会使用null作为返回结果。

BroadcastChannel

创建 broadcastCbannel 的方法与创建普通的 Channel 几乎没有区别:

val broadcastChannel = broadcastChannel<Int>(5)

如果要订阅功能,那么只 要调用如下方法

val receiveChannel = broadcastChannel.openSubscription()

这样我们就得到了一个 ReceiveChannel,如果想要想获取订阅的消息,只需要调用它的 receive 函数;如果想要取消订阅则调用 cancel 函数即可。

我们来看一个比较完整的例子,本示例中我们在发送端发 0 1 2, 并启动 3个协程同时接收广播,相关代码如下所示。

suspend fun broadcast() 
    //下面几种都可以创建一个BroadcastChannel
    //val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
    //val broadcastChannel = Channel<Int>(Channel.BUFFERED).broadcast()
    val broadcastChannel = GlobalScope.broadcast 
        for (i in 0..2) 
            send(i)
        
    
    //启动3个子协程作为接受者,每个都能收到
    List(3)  index ->
        GlobalScope.launch 
            val receiveChannel = broadcastChannel.openSubscription() // 订阅
            for (i in receiveChannel) 
                println("[#$index] received: $i")
            
        
    .joinAll()

除了直接创建以外,我们也可以用前面定义的普通 Channel 进行转换,代码如下所示。

// 通过 Channel 实例直接创建广播
val channel = Channel<Int>()
val broadcastChannel = channel.broadcast()

Channel 版本的序列生成器

// 使用channel模拟序列生成器
val channel = GlobalScope.produce 
    println("A")
    send(1)
    println("B")
    send(2)
    println("Done")

for (item in channel) 
    println("get $item")

冷数据流Flow

Sequence中不能调用其他挂起函数,不能设置调度器,只能单线程中使用。而Flow可以支持:

// 序列生成器中不能调用其他挂起函数
sequence 
    (1..3).forEach 
        yield(it)
        delay(100) // ERROR
    

创建Flow

val intFlow = flow 
   (1..3).forEach 
       emit(it)
       delay(100)
   

Flow 也可以设定它运行时所使用的调度器:

intFlow.flowDn(Dispatchers.IO)

通过 flowOn 设置的调度器只对它之前的操作有影响,因此这里意味着 intFlow 的构造逻辑会在 IO 调度器上执行。

最终读取 intFlow 需要调用 collect 函数, 这个函数也是一个挂起函数。我们启动一个协程来消费 intFlow, 代码如下所示

suspend fun testFlows()
    val dispatcher = Executors.newSingleThreadExecutor 
        Thread(it, "MyThread").also  it.isDaemon = true 
    .asCoroutineDispatcher()

    GlobalScope.launch(dispatcher) 
        val intFlow = flow 
            (1..3).forEach 
                emit(it)
                println("$Thread.currentThread().name: emit $it")
                delay(1000)
            
        
        intFlow.flowOn(Dispatchers.IO)
            .collect 
                println("$Thread.currentThread().name: collect $it")
            
    .join()

为了方便区分,我们为协程设置了一个自定义的调度器,它会将协程调度到名叫 MyThread 的线程上,结果如下:

也就是说,collect中的代码运行在Global.launch指定的调度器上,flow… 中的代码运行在 flowOn 指定的调度器上

对比 RxJava 的线程切换

RxJava 也是 个基千响应式编程模型的异步框架,它提供了两个切换调度器的 API, 分别是 subscribeOn observeOn, 其中 subscribeOn 指定的调度器执行被观察者的代码, observeOn 指定调度器运行观察者的代码,观察者最后在observeOn 指定调度器上收集结果。flowOn 就相当于subscribeOn,而 launch的调度器 就相当于 observeOn 指定的调度器。

在一个 Flow 创建出来之后,不消费则不生产,多次消费则多次生产,生产和消费总是相对应的,代码如下所示。

GlobalScope.launch(dispatcher) 
    val intFlow = flow 
        (1..3).forEach 
            emit(it)
            delay(1000)
        
    
    // Flow 可以被重复消费
    intFlow.collect  println(it) 
    intFlow.collect  println(it) 
.join()

消费它会输出 “1, 2, 3", 重复消费它会重复输出“1, 2, 3"。

这一点类似于我们前面提到的序列生成器和 RxJava 的例子,它们也都有自己的消费端。我们创建序列后去迭代它,每次迭代都会创建一个新的迭代器从头开始迭代RxJava Observable 也是如此,每次调用它的 subscribe 都会重新消费一次。

所谓冷数据流,就是只有消费时才会生 的数据流,这一点 Channel 正好相反,Channel 发送端并不依赖于接收端。

异常处理

Flow 的异常处理也比较 接,直接调用 catch 函数即可,如下所示。

如果想要在 Flow 完成时执行逻辑,可以使用 onCompletiononCompletion 用起来类似于 try… catch… finally 中的 finally, 无论前面是否存在异常,它都会被调用,参数 t 则是前面未捕获的异常。

这套处理机制的设计初衷是确保 Flow 操作中异常的透明。因此,直接使用 try - catch - finally 的写法是违反 Flow 的设计原则的。

suspend fun exception()
    flow<Int> 
        emit(1)
        throw ArithmeticException("Div 0")
    .catch t: Throwable ->
        log("caught error: $t")
    .onCompletion  t: Throwable? ->
        log("finally.")
    .flowOn(Dispatchers.IO)
        .collect  log(it) 
// 不推荐直接使用 try - catch - finally 写法
//    flow  // bad!!!
//        try 
//            emit(1)
//            throw ArithmeticException("Div 0")
//         catch (t: Throwable)
//            log("caught error: $t")
//         finally 
//            log("finally.")
//        
//    

我们在 Flow 操作内部使用 try… catch. … finally, 这样的写法后续可能会被禁用。

// Flow 从异常中恢复
flow 
    emit(1)
    throw ArithmeticException("divide 0")
.catch  t : Throwable ->
    println("caught error $t")
    emit(10)

这里我们可以使用 emit 重新生产新元素。细 心的读者一定会发现, emit 定义在 FlowCollector 中,因此只要遇到 Receiver 为 FlowCollector 的函数,我们就可以生产新元素。

末端操作符

前面的例子中,我们用 collect 消费 flow 的数据 collect 是最基本的末端操作符,功能与 RxJava 的 subscribe 类似。

除了 collect 之外,还有其他常见的末端操作符,它们大体分为两类:

  • 集合类型转换操作符,包括 toList toSet
  • 聚合操作符,包括将 flow 规约到单值的 reduce、fold 等操作;还有获得单个元素的操作符,包括 sing、singleOrNull、first 等。

实际上,识别是否为末端操作符 ,还有一个简单方法 :由于 Flow 的消费端一定需要运行在协程中 ,因此末端操作符都是挂起函数。

分离 Flow 的消费和触发

我们除了可以在 collect 处消费 Flow 的元素以外,还可以通过 onEach 来做到这一点。这样消费的具体操作就不需要与末端操作符放到一起, collect 函数可以放到其他任意位置调用,例如代码如下所示。

// 分类 Flow 的消费和触发
fun createFlow() = flow<Int> 
    (1..3).forEach 
        emit(it)
        delay(100)
    
.onEach  println(it) 

fun main() 
    GlobalScope.launch 
        createFlow().collect()
    

由此,我们又可以衍生出一种新的消费 Flow 的写法,代码如下所示。

fun main() 
    // 使用协程作用域直接触发 Flow
    createFlow().launchIn(GlobalScope)

其中, launchln 函数只接收 CoroutineScope 类型的参数。

Flow的取消

Flow 没有提供取消操作的方法,因为并不需要。
我们前面已经介绍了 Flow 的消费依赖于 collect 这样的末端操作符,而它们又必须在协程中调用,因此 Flow 的取消主要依赖于末端操作符所在的协程的状态。

以上是关于[Kotlin Tutorials 22] 协程中的异常处理的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Kotlin 协程中使用 Sqldelight

Kotlin 协程协程异常处理 ④ ( Android 协程中出现异常导致应用崩溃 | Android 协程中使用协程异常处理器捕获异常 | Android 全局异常处理器 )

Kotlin 协程协程异常处理 ④ ( Android 协程中出现异常导致应用崩溃 | Android 协程中使用协程异常处理器捕获异常 | Android 全局异常处理器 )

Kotlin 协程协程中的多路复用技术 ① ( 多路复用技术 | await 协程多路复用 | Channel 通道多路复用 )

Kotlin 协程协程中的多路复用技术 ② ( select 函数原型 | SelectClauseN 事件 | 查看挂起函数是否支持 select )

我们可以在 Android Kotlin 的单个协程中下载多个文件吗