Kotlin协程:Flow基础原理

Posted 嘴巴吃糖了

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程:Flow基础原理相关的知识,希望对你有一定的参考价值。

本文分析示例代码如下:

launch(Dispatchers.Main) 
    flow 
        emit(1)
        emit(2)
    .collect 
        delay(1000)

        withContext(Dispatchers.IO) 
            Log.d("liduo", "$it")
        

        Log.d("liduo", "$it")
    

一.Flow的创建

在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码如下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

public interface FlowCollector<in T> 
    // 可挂起,非线程安全
    public suspend fun emit(value: T)

调用flow方法,会返回一个Flow接口指向的对象,代码如下:

public interface Flow<out T> 

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)

这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

二.Flow的消费

在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> 
        override suspend fun emit(value: T) = action(value)
    )

这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

1.SafeFlow类

根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() 
    override suspend fun collectSafely(collector: FlowCollector<T>) 
        collector.block()
    

SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。

2.AbstractFlow类

AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> 

     // 核心方法 
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) 
        // 创建SafeCollector对象,对collector进行包裹
        val safeCollector = SafeCollector(collector, coroutineContext)
        try 
            // 调用collectSafely方法
            collectSafely(safeCollector)
         finally 
            // 释放拦截的续体
            safeCollector.releaseIntercepted()
        
    

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)

collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

3. SafeCollector类

当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame 

    ...
    // 保存上下文中元素数量,用于检查上下文是否变化
    @JvmField
    internal actual val collectContextSize = collectContext.fold(0)  count, _ -> count + 1 
    // 保存上一次的上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 执行结束后的续体
    private var completion: Continuation<Unit>? = null

    // 协程上下文
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext

    // 挂起的核心方法
    override fun invokeSuspend(result: Result<Any?>): Any? 
        result.onFailure  lastEmissionContext = DownstreamExceptionElement(it) 
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    

    // 释放拦截的续体
    public actual override fun releaseIntercepted() 
        super.releaseIntercepted()
    

    // 发射数据
    override suspend fun emit(value: T) 
        // 获取当前suspend方法续体
        return suspendCoroutineUninterceptedOrReturn sc@ uCont ->
            try 
                // 调用重载的方法
                emit(uCont, value)
             catch (e: Throwable) 
                 // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 抛出异常
                throw e
            
        
    

    // 重载的emit方法
    private fun emit(uCont: Continuation<Unit>, value: T): Any? 
        // 从续体中获取上下文
        val currentContext = uCont.context
        // 保证当前协程的Job是active的
        currentContext.ensureActive()
        // 获取上次的上下文
        val previousContext = lastEmissionContext
        // 如果前后上下文发生变化
        if (previousContext !== currentContext) 
            // 检查上下文是否发生异常
            checkContext(currentContext, previousContext, value)
        
        // 保存续体
        completion = uCont
        // 调用emitFun方法,传入collector,value,continuation
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    

    // 检查上下文变化,防止并发
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) 
        // 如果上次执行过程中发生了异常
        if (previousContext is DownstreamExceptionElement) 
            // 抛出异常
            exceptionTransparencyViolated(previousContext, value)
        
        // 检查上下文是否发生变化,如果变化,则抛出异常
        checkContext(currentContext)
        lastEmissionContext = currentContext
    

    // 用于抛出异常
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) 
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception $exception.e, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    

emit方法最终会调用emitFun方法方法,代码如下:

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) 
   InlineMarker.mark(0);
   // 核心执行
   Object var10000 = p1.emit(p2, continuation);
   InlineMarker.mark(2);
   InlineMarker.mark(1);
   return var10000;

可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> 
        override suspend fun emit(value: T) = action(value)
    )

调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

4.消费过程中的挂起

如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

override suspend fun emit(value: T) 
    // 获取当前suspend方法续体
    return suspendCoroutineUninterceptedOrReturn sc@ uCont ->
        try 
            // 调用重载的方法
            emit(uCont, value)
         catch (e: Throwable) 
            // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
            lastEmissionContext = DownstreamExceptionElement(e)
            // 抛出异常
            throw e
        
    

当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)

恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:

Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

override fun invokeSuspend(result: Result<Any?>): Any? 
    // 尝试获取异常
    result.onFailure  lastEmissionContext = DownstreamExceptionElement(it) 
    // 如果没有异常,则恢复flow方法续体的执行
    completion?.resumeWith(result as Result<Unit>)
    // 返回挂起标识,这里挂起的是消费过程
    return COROUTINE_SUSPENDED

在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:

作者:李萧蝶
链接:https://juejin.cn/post/7137647612286468132

三、Kotlin学习资料

一. Kotlin入门教程指南

二. 高级Kotlin强化实战

三. android版Kotlin协程入门进阶实战

《Kotlin入门教程指南》

第一章 Kotlin 入门教程指南

  • 前言

第二章 概述

  • 使用 Kotlin 进行服务器端开发
  • 使用 Kotlin 进行 Android 开发
  • Kotlin javascript 概述
  • Kotlin/Native 用于原生开发
  • 用于异步编程等场景的协程
  • Kotlin 1.1 的新特性
  • Kotlin 1.2 的新特性
  • Kotlin 1.3 的新特性

第三章 开始

  • 基本语法
  • 习惯用法
  • 编码规范

第四章 基础

  • 基本类型
  • 控制流:if、when、for、while
  • 返回和跳转

第五章 类与对象

  • 类与继承
  • 属性与字段
  • 接口
  • 可见性修饰符
  • 扩展
  • 数据类
  • 密封类
  • 泛型
  • 嵌套类与内部类
  • 枚举类
  • 对象表达式与对象声明
  • Inline classes
  • 委托

第六章 函数与 Lambda 表达式

  • 函数
  • 高阶函数与 lambda 表达式
  • 内联函数

第七章 其他

  • 解构声明
  • 集合:List、Set、Map
  • 区间
  • 类型的检查与转换“is”与“as”
  • This 表达式
  • 相等性
  • 操作符重载
  • 空安全
  • 异常
  • 注解
  • 反射
  • 类型安全的构建器
  • 类型别名
  • 多平台程序设计
  • 关键字与操作符

第八章 Java 互操作与 JavaScript

  • 在 Kotlin 中调用 Java 代码
  • Java 中调用 Kotlin
  • JavaScript 动态类型
  • Kotlin 中调用 JavaScript
  • JavaScript 中调用 Kotlin
  • JavaScript 模块
  • JavaScript 反射
  • JavaScript DCE

第九章 协程

  • 协程基础
  • 取消与超时
  • 通道 (实验性的)
  • 组合挂起函数
  • 协程上下文与调度器
  • 异常处理
  • select 表达式(实验性的)
  • 共享的可变状态与并发

第十章 工具

  • 编写 Kotlin 代码文档
  • Kotlin 注解处理
  • 使用 Gradle
  • 使用 Maven
  • 使用 Ant
  • Kotlin 与 OSGi
  • 编译器插件
  • 不同组件的稳定性

第十一章 常见问题总结

  • FAQ
  • 与 Java 语言比较
  • 与 Scala 比较【官方已删除】

高级Kotlin强化实战

第一章 Kotlin 入门教程

  • Kotlin 概述
  • Kotlin 与 Java 比较
  • 巧用 Android Studio
  • 认识 Kotlin 基本类型
  • 走进 Kotlin 的数组
  • 走进 Kotlin 的集合
  • 完整代码
  • 基础语法

第二章 Kotlin 实战避坑指南

  • 方法入参是常量,不可修改
  • 不要 Companion、INSTANCE?
  • Java 重载,在 Kotlin 中怎么巧妙过渡一下?
  • Kotlin 中的判空姿势
  • Kotlin 复写 Java 父类中的方法
  • Kotlin “狠”起来,连TODO都不放过!
  • is、as` 中的坑
  • Kotlin 中的 Property 的理解
  • also 关键字
  • takeIf 关键字
  • 单例模式的写法

第三章 项目实战《Kotlin Jetpack 实战》

  • 从一个膜拜大神的 Demo 开始
  • Kotlin 写 Gradle 脚本是一种什么体验?
  • Kotlin 编程的三重境界
  • Kotlin 高阶函数
  • Kotlin 泛型
  • Kotlin 扩展
  • Kotlin 委托
  • 协程“不为人知”的调试技巧
  • 图解协程:suspend

Android版Kotlin协程入门进阶实战

第一章 Kotlin协程的基础介绍

  • 协程是什么
  • 什么是Job 、Deferred 、协程作用域
  • Kotlin协程的基础用法

第二章 kotlin协程的关键知识点初步讲解

  • 协程调度器
  • 协程上下文
  • 协程启动模式
  • 协程作用域
  • 挂起函数

第三章 kotlin协程的异常处理

  • 协程异常的产生流程
  • 协程的异常处理

第四章 kotlin协程在Android中的基础应用

  • Android使用kotlin协程
  • 在Activity与Framgent中使用协程
  • ViewModel中使用协程
  • 其他环境下使用协程

第五章 kotlin协程的网络请求封装

  • 协程的常用环境
  • 协程在网络请求下的封装及使用
  • 高阶函数方式
  • 多状态函数返回值方式

第六章 深入kotlin协程原理(一)

  • suspend的花花肠子
  • 藏在身后的-Continuation
  • 村里的希望-SuspendLambda

第七章 深入kotlin协程原理(二)

  • 协程的那些小秘密
  • 协程的创建过程
  • 协程的挂起与恢复
  • 协程的执行与状态机

第八章 Kotlin Jetpack 实战

  • 从一个膜拜大神的 Demo 开始
  • Kotlin 写 Gradle 脚本是一种什么体验?
  • Kotlin 编程的三重境界
  • Kotlin 高阶函数
  • Kotlin 泛型
  • Kotlin 扩展
  • Kotlin 委托
  • 协程“不为人知”的调试技巧
  • 图解协程原理

第九章 Kotlin + 协程 + Retrofit + MVVM优雅的实现网络请求

  • 项目配置
  • 实现思路
  • 协程实现
  • 协程 + ViewModel + LiveData实现
  • 后续优化
  • 异常处理
  • 更新Retrofit 2.6.0

Kotlin入门教程指南》完整版可点击文末CSDN官方认证卡片获取!

最后

Kotlin 通常被视为下一个 Java,在 StackOverflow 的 2019 年开发人员调查中,Kotlin 成为第四大“最受欢迎”和第五大“想要”的编程语言,在所有移动编程语言中排名最高。Android 开发由 Java 转 Kotlin 早已势不可挡。

希望这篇文章会对你学习和掌握 Kotlin 语言有所帮助,也希望各位读者能在Android开发的进阶之路上走得长远,共勉!!

以上是关于Kotlin协程:Flow基础原理的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin协程之flow工作原理

Kotlin协程之flow工作原理

Koltin协程:Flow的触发与消费

社区说|Kotlin Flow 的原理与设计哲学

深潜Kotlin协程(十九):Flow 概述

深入kotlin - Flow 进阶