Kotlin协程:Flow基础原理
Posted 嘴巴吃糖了
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程:Flow基础原理相关的知识,希望对你有一定的参考价值。
本文分析示例代码如下:
launch(Dispatchers.Main)
flow
emit(1)
emit(2)
.collect
delay(1000)
withContext(Dispatchers.IO)
Log.d("liduo", "$it")
Log.d("liduo", "$it")
一.Flow的创建
在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码如下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:
public interface FlowCollector<in T>
// 可挂起,非线程安全
public suspend fun emit(value: T)
调用flow方法,会返回一个Flow接口指向的对象,代码如下:
public interface Flow<out T>
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。
二.Flow的消费
在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T>
override suspend fun emit(value: T) = action(value)
)
这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector
对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。
1.SafeFlow类
根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>()
override suspend fun collectSafely(collector: FlowCollector<T>)
collector.block()
SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。
2.AbstractFlow类
AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T>
// 核心方法
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>)
// 创建SafeCollector对象,对collector进行包裹
val safeCollector = SafeCollector(collector, coroutineContext)
try
// 调用collectSafely方法
collectSafely(safeCollector)
finally
// 释放拦截的续体
safeCollector.releaseIntercepted()
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。
3. SafeCollector类
当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。
SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame
...
// 保存上下文中元素数量,用于检查上下文是否变化
@JvmField
internal actual val collectContextSize = collectContext.fold(0) count, _ -> count + 1
// 保存上一次的上下文
private var lastEmissionContext: CoroutineContext? = null
// 执行结束后的续体
private var completion: Continuation<Unit>? = null
// 协程上下文
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext
// 挂起的核心方法
override fun invokeSuspend(result: Result<Any?>): Any?
result.onFailure lastEmissionContext = DownstreamExceptionElement(it)
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
// 释放拦截的续体
public actual override fun releaseIntercepted()
super.releaseIntercepted()
// 发射数据
override suspend fun emit(value: T)
// 获取当前suspend方法续体
return suspendCoroutineUninterceptedOrReturn sc@ uCont ->
try
// 调用重载的方法
emit(uCont, value)
catch (e: Throwable)
// 出现异常时,将异常封装成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 抛出异常
throw e
// 重载的emit方法
private fun emit(uCont: Continuation<Unit>, value: T): Any?
// 从续体中获取上下文
val currentContext = uCont.context
// 保证当前协程的Job是active的
currentContext.ensureActive()
// 获取上次的上下文
val previousContext = lastEmissionContext
// 如果前后上下文发生变化
if (previousContext !== currentContext)
// 检查上下文是否发生异常
checkContext(currentContext, previousContext, value)
// 保存续体
completion = uCont
// 调用emitFun方法,传入collector,value,continuation
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
// 检查上下文变化,防止并发
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
)
// 如果上次执行过程中发生了异常
if (previousContext is DownstreamExceptionElement)
// 抛出异常
exceptionTransparencyViolated(previousContext, value)
// 检查上下文是否发生变化,如果变化,则抛出异常
checkContext(currentContext)
lastEmissionContext = currentContext
// 用于抛出异常
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?)
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception $exception.e, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
emit方法最终会调用emitFun方法方法,代码如下:
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:
@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation)
InlineMarker.mark(0);
// 核心执行
Object var10000 = p1.emit(p2, continuation);
InlineMarker.mark(2);
InlineMarker.mark(1);
return var10000;
可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。
而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T>
override suspend fun emit(value: T) = action(value)
)
调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。
通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。
4.消费过程中的挂起
如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED
对象,suspendCoroutineUninterceptedOrReturn
方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:
override suspend fun emit(value: T)
// 获取当前suspend方法续体
return suspendCoroutineUninterceptedOrReturn sc@ uCont ->
try
// 调用重载的方法
emit(uCont, value)
catch (e: Throwable)
// 出现异常时,将异常封装成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 抛出异常
throw e
当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun
可以知道,这里传入的续体为this,也就是当前的SafeCollector
类对象,代码如下:
emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:
在Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:
override fun invokeSuspend(result: Result<Any?>): Any?
// 尝试获取异常
result.onFailure lastEmissionContext = DownstreamExceptionElement(it)
// 如果没有异常,则恢复flow方法续体的执行
completion?.resumeWith(result as Result<Unit>)
// 返回挂起标识,这里挂起的是消费过程
return COROUTINE_SUSPENDED
在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:
作者:李萧蝶
链接:https://juejin.cn/post/7137647612286468132
三、Kotlin学习资料
一. Kotlin入门教程指南
二. 高级Kotlin强化实战
三. android版Kotlin协程入门进阶实战
《Kotlin入门教程指南》
第一章 Kotlin 入门教程指南
- 前言
第二章 概述
- 使用 Kotlin 进行服务器端开发
- 使用 Kotlin 进行 Android 开发
- Kotlin javascript 概述
- Kotlin/Native 用于原生开发
- 用于异步编程等场景的协程
- Kotlin 1.1 的新特性
- Kotlin 1.2 的新特性
- Kotlin 1.3 的新特性
第三章 开始
- 基本语法
- 习惯用法
- 编码规范
第四章 基础
- 基本类型
- 包
- 控制流:if、when、for、while
- 返回和跳转
第五章 类与对象
- 类与继承
- 属性与字段
- 接口
- 可见性修饰符
- 扩展
- 数据类
- 密封类
- 泛型
- 嵌套类与内部类
- 枚举类
- 对象表达式与对象声明
- Inline classes
- 委托
第六章 函数与 Lambda 表达式
- 函数
- 高阶函数与 lambda 表达式
- 内联函数
第七章 其他
- 解构声明
- 集合:List、Set、Map
- 区间
- 类型的检查与转换“is”与“as”
- This 表达式
- 相等性
- 操作符重载
- 空安全
- 异常
- 注解
- 反射
- 类型安全的构建器
- 类型别名
- 多平台程序设计
- 关键字与操作符
第八章 Java 互操作与 JavaScript
- 在 Kotlin 中调用 Java 代码
- Java 中调用 Kotlin
- JavaScript 动态类型
- Kotlin 中调用 JavaScript
- JavaScript 中调用 Kotlin
- JavaScript 模块
- JavaScript 反射
- JavaScript DCE
第九章 协程
- 协程基础
- 取消与超时
- 通道 (实验性的)
- 组合挂起函数
- 协程上下文与调度器
- 异常处理
- select 表达式(实验性的)
- 共享的可变状态与并发
第十章 工具
- 编写 Kotlin 代码文档
- Kotlin 注解处理
- 使用 Gradle
- 使用 Maven
- 使用 Ant
- Kotlin 与 OSGi
- 编译器插件
- 不同组件的稳定性
第十一章 常见问题总结
- FAQ
- 与 Java 语言比较
- 与 Scala 比较【官方已删除】
《高级Kotlin强化实战》
第一章 Kotlin 入门教程
- Kotlin 概述
- Kotlin 与 Java 比较
- 巧用 Android Studio
- 认识 Kotlin 基本类型
- 走进 Kotlin 的数组
- 走进 Kotlin 的集合
- 完整代码
- 基础语法
第二章 Kotlin 实战避坑指南
- 方法入参是常量,不可修改
- 不要 Companion、INSTANCE?
- Java 重载,在 Kotlin 中怎么巧妙过渡一下?
- Kotlin 中的判空姿势
- Kotlin 复写 Java 父类中的方法
- Kotlin “狠”起来,连TODO都不放过!
- is、as` 中的坑
- Kotlin 中的 Property 的理解
- also 关键字
- takeIf 关键字
- 单例模式的写法
第三章 项目实战《Kotlin Jetpack 实战》
- 从一个膜拜大神的 Demo 开始
- Kotlin 写 Gradle 脚本是一种什么体验?
- Kotlin 编程的三重境界
- Kotlin 高阶函数
- Kotlin 泛型
- Kotlin 扩展
- Kotlin 委托
- 协程“不为人知”的调试技巧
- 图解协程:suspend
《Android版Kotlin协程入门进阶实战》
第一章 Kotlin协程的基础介绍
- 协程是什么
- 什么是Job 、Deferred 、协程作用域
- Kotlin协程的基础用法
第二章 kotlin协程的关键知识点初步讲解
- 协程调度器
- 协程上下文
- 协程启动模式
- 协程作用域
- 挂起函数
第三章 kotlin协程的异常处理
- 协程异常的产生流程
- 协程的异常处理
第四章 kotlin协程在Android中的基础应用
- Android使用kotlin协程
- 在Activity与Framgent中使用协程
- ViewModel中使用协程
- 其他环境下使用协程
第五章 kotlin协程的网络请求封装
- 协程的常用环境
- 协程在网络请求下的封装及使用
- 高阶函数方式
- 多状态函数返回值方式
第六章 深入kotlin协程原理(一)
- suspend的花花肠子
- 藏在身后的-Continuation
- 村里的希望-SuspendLambda
第七章 深入kotlin协程原理(二)
- 协程的那些小秘密
- 协程的创建过程
- 协程的挂起与恢复
- 协程的执行与状态机
第八章 Kotlin Jetpack 实战
- 从一个膜拜大神的 Demo 开始
- Kotlin 写 Gradle 脚本是一种什么体验?
- Kotlin 编程的三重境界
- Kotlin 高阶函数
- Kotlin 泛型
- Kotlin 扩展
- Kotlin 委托
- 协程“不为人知”的调试技巧
- 图解协程原理
第九章 Kotlin + 协程 + Retrofit + MVVM优雅的实现网络请求
- 项目配置
- 实现思路
- 协程实现
- 协程 + ViewModel + LiveData实现
- 后续优化
- 异常处理
- 更新Retrofit 2.6.0
《Kotlin入门教程指南》完整版可点击文末CSDN官方认证卡片获取!
最后
Kotlin 通常被视为下一个 Java,在 StackOverflow 的 2019 年开发人员调查中,Kotlin 成为第四大“最受欢迎”和第五大“想要”的编程语言,在所有移动编程语言中排名最高。Android 开发由 Java 转 Kotlin 早已势不可挡。
希望这篇文章会对你学习和掌握 Kotlin 语言有所帮助,也希望各位读者能在Android开发的进阶之路上走得长远,共勉!!
以上是关于Kotlin协程:Flow基础原理的主要内容,如果未能解决你的问题,请参考以下文章