Kotlin:深入理解StateFlow与SharedFlow,StateFlow和LiveData使用差异区分,SharedFlow实现源码解析。

Posted pumpkin的玄学

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin:深入理解StateFlow与SharedFlow,StateFlow和LiveData使用差异区分,SharedFlow实现源码解析。相关的知识,希望对你有一定的参考价值。

本文接上一篇博文:Kotlin:Flow 全面详细指南,附带源码解析。

StateFlow、SharedFlow

先看一下Google对于StateFlow和SharedFlow的介绍🙆‍♀️:

StateFlow and SharedFlow are Flow APIs that enable flows to optimally emit state updates and emit values to multiple consumers.

StateFlow 和 SharedFlow 是 Flow API,它们使流能够以最佳方式发出状态更新并向多个消费者发出值

StateFlow和SharedFlow是一种很特殊的Flow,它们是热流。介绍Flow的时候有说过,它是冷流,再不调用终端操作符的情况下,Flow构建块的代码是不会执行的,每一个消费者调用一次Flow,则构建块的代码会从头到尾执行一次。而热流可以不依赖消费者而存活,可以在流之外生成数据,然后传递给流

真所谓,由简入繁,繁尽至简😮。

我们先从StateFlow和SharedFlow的使用讲起(会稍带涉及到StateFlow和LiveData的区别),而后对其核心实现进行源码分析。

StateFlow使用

StateFlow简介

StateFlow 是一个状态持有者可观察流,它向其收集器发出当前和新状态更新。 当前状态值也可以通过其 value 属性读取。 要更新状态并将其发送到流。

StateFlow提供可读可写仅可读两个版本,这一点和LiveData相似,如下表格🙆‍♀️。

仅可读可读可写
LiveDataLiveDataMutableLiveData
StateFlowStateFlowSharedFlow

StateFlow的用法

StateFlow 非常适合需要维护可观察的可变状态的类。

简单使用举例🌰

    val value = AtomicInteger(0)
    val stateFlow = MutableStateFlow(value.incrementAndGet()) //+1
    stateFlow.value = value.incrementAndGet()// +1
    runBlocking 
        GlobalScope.launch (Dispatchers.Default)
            stateFlow.collect 
                log("receiver value $it")
            
        
        GlobalScope.launch 
            delay(2000)
            stateFlow.collect 
                log("receiver2 value $it")
            
        
        while (true) 
            delay(1000)
            val sendValue = value.incrementAndGet()
            log("sendValue$sendValue")
            stateFlow.emit(sendValue)
        
    
//输出
21:42:34:789 [DefaultDispatcher-worker-1] receiver value 2
21:42:35:781 [main] sendValue3
21:42:35:781 [DefaultDispatcher-worker-1] receiver value 3
21:42:36:784 [main] sendValue4
21:42:36:784 [DefaultDispatcher-worker-1] receiver value 4
21:42:36:784 [DefaultDispatcher-worker-1] receiver2 value 4
21:42:37:786 [main] sendValue5
21:42:37:786 [DefaultDispatcher-worker-1] receiver value 5
21:42:37:786 [DefaultDispatcher-worker-2] receiver2 value 5
...

下面来解释一下🙆‍♀️:

  • 我们创建了AtomicInteger初始值为0,我们调用incrementAndGet获取值,然后传入MutableStateFlow构造方法,构建StateFlow。之后设置StateFlow的属性value = incrementAndGet。注意:上面说的一系列操作都是没有消费者订阅的情况下进行的,之后启动第一个协程进行collect,打印出来的第一个值是2。说明:在没有订阅者的情况下,流依旧进行了执行。
  • 再看代码最下面写了死循环,每隔1秒钟emit一个值给到StateFlow,所以第一个协程手机是没有问题的,也是每隔一秒钟打印一次。看第二个协程,实在延迟两秒中之后,才进行收集,我们看到第二个收集器,是从4开始打印的,且和第一个同步。说明:StateFlow只保留一个值且是最新值;可以允许有多个订阅者,会将此时最新的值,发送给此时订阅的所有存活的订阅者。

LiveData与StateFlow差异对比

liveData可参考:Jetpack:LiveData使用指南,实现原理详细解析!

可以发现基本和LiveData基本一样🤦‍♀️,但是还是有不同的,下面总结一下StateFlow和LiveData的差异性

相同点:😮

  1. 提供可读可写仅可读两个版本
  2. 值是唯一的
  3. 允许被多个观察者观察
  4. 永远会把最新的值给到观察者,即使没有观察者,也会更新自己的值
  5. 都会产生粘性事件问题
  6. 都可能产生丢失值的问题

不同点:😮

  1. StateFlow必须在构建的时候传入初始值,LiveData不需要
  2. StateFlow默认是防抖的,LiveData默认不防抖
  3. 对于android来说StateFlow默认没有和生命周期绑定,直接使用会有问题,请继续向下看🙆‍♀️

StateFlow特别说明

下面就上方的几点特别的做一下说明:😎

StateFlow会产生粘性事件:这个其实在上面的例子就可以说明了,StateFlow是必有值的,只要有订阅者订阅,这个值立马就会发送过去。具体可以理解为,在ViewModel中放了一个StateFlow之后在Activity中对值进行了调用弹出一个SnackBar,在屏幕旋转之后会再次弹出SnackBar。

StateFlow丢失值问题:当上游发射的速度,大于订阅者处理的速度的时候,中间值就会丢失了,下面具体看一个例子👀:

    val stateFlow = MutableStateFlow(value.get())
    runBlocking 
        stateFlow.onEach 
            //模拟处理耗时
            delay(3000)
            log("receiver:$it")
        .launchIn(this)
        while (true) 
            delay(1000)
            val v = value.incrementAndGet()
            log("send:$v")
            stateFlow.emit(v)
        
    
//结果
10:58:26:004 [main] send:1
10:58:27:031 [main] send:2
10:58:27:990 [main] receiver:0
10:58:28:038 [main] send:3
10:58:29:054 [main] send:4
10:58:30:057 [main] send:5
10:58:30:996 [main] receiver:2
10:58:31:070 [main] send:6
10:58:32:073 [main] send:7
10:58:33:076 [main] send:8
10:58:34:004 [main] receiver:5
...

上方模拟处理耗时是3秒,发送则是每隔一秒钟发一次。处理耗时是发送的三倍,那么中间的值就丢失了。

至于之前讲Flow时说到的三种背压处理策略,这里只能使用collectLatest,发送新值时取消之前的操作,可以参考之前Flow里面的详细介绍。

StateFlow默认是防抖的

这一点可以看一下源码:🙆‍♀️

	emit()调用了setValue
    setValue 调用了 updateState
    看一下 updateState
    private fun updateState(expectedState: Any?, newState: Any): Boolean 
        var curSequence = 0
        var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
        synchronized(this) 
            val oldState = _state.value
            if (expectedState != null && oldState != expectedState) return false // CAS support
            if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
            _state.value = newState
            curSequence = sequence
            if (curSequence and 1 == 0)  // even sequence means quiescent state flow (no ongoing update)
                curSequence++ // make it odd
                sequence = curSequence
             else 
                // update is already in process, notify it, and return
                sequence = curSequence + 2 // change sequence to notify, keep it odd
                return true // updated
            
            curSlots = slots // read current reference to collectors under lock
        

关注一下这一行代码if (oldState == newState) return true,更新数据时,会判断当前值与新值是否相同,如果相同则不更新数据。

关于可读可写仅可读的使用官方给出一下的例子,可以参考:🙆‍♀️

class CounterModel 
    private val _counter = MutableStateFlow(0) // private mutable state flow
    val counter = _counter.asStateFlow() // publicly exposed as read-only state flow

    fun inc() 
        _counter.value++
    

另外注意一点:StateFlow 是一个热流。所以只要流被收集或从垃圾收集根存在对它的任何其他引用,它就会保留在内存中。

Android中使用StateFlow实践

我们知道直接在UI中对收集流时不安全的,所以需要判断判断UI的生命周期,来决定是否开启或者取消流的收集,对应于Flow就是是否启动或者取消收集的协程。我们知道LiveData自身有一个LifecycleBoundObserver需要传入当前的LifecycleOwner用于绑定声明周期,在onStop之后就会停止收集,那么StateFlow有没有呢?没有!😂

但是Google官方在有一个扩展函数,对生命周期进行了管理,方便使用。就是repeatOnLifecycle,它会在传入的生命周期开始启动一个协程,然后在与之对应的另一个状态取消携程,它是一个挂起函数,在onDestroy之后才会恢复下方的执行,举个例子如下所示👀

	override fun onCreate(savedInstanceState: Bundle?) 
        ...
        // 使用lifecycleScope启动协程
        lifecycleScope.launch 
            // 每次生命周期处于 STARTED 状态(或更高状态)时,repeatOnLifecycle 在新的协程中启动块,并在它停止时取消它。
            lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) 
                // 
                ViewModel.uiState.collect  uiState ->
                    // todo
                
            
            //如果这里被执行,则代表生命周期已经走到了onDestroy,因为repeatOnLifecycle是挂起函数,在生命周期为onDestroy的时候进行了恢复。
        
    

注意:repeatOnLifecycle API 仅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01 库及更高版本中可用。后续会有文章会对repeatOnLifecycle做源码分析,敬请期待!

SharedFlow使用

SharedFlow简介

SharedFlow也是一个热流。它是StateFlow 的高度可配置的泛化,提供了更多的能力。换句话说,StateFlow是一个特殊的SharedFlow。

SharedFlow同样有两个版本,SharedFlowMutableSharedFlow

那为什么说StateFlow是一个特殊的SharedFlow呢😮?我们来看一下继承关系就知道了

StateFlow 继承自 SharedFlow MutableStateFlow 继承自 MutableSharedFlow

SharedFlow的使用

在这之前,我们先看一下StateFlow与SharedFlow有什么不同

  • SharedFlow没有默认值
  • SharedFlow可以保存旧的数据,根据配置可以将旧的数据回播给新的订阅者
  • SharedFlow使用emit/tryEmit发射数据,StateFlow内部其实都是调用的setValue
  • SharedFlow会挂起直到所有的订阅者处理完成。

看一下MutableSharedFlow的构建函数

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

可以看得出,MutableSharedFlow不需要传入默认值。

解析一下构建MutableSharedFlow的三个参数的含义

  • replay:重播给新订阅者的值的数量(不能为负,默认为零)
  • extraBufferCapacity:除重播外缓冲的值的数量。 当有剩余缓冲区空间时,emit 不会挂起(可选,不能为负,默认为零)。
  • onBufferOverflow:配置缓冲区溢出的操作(可选,默认BufferOverflow.SUSPEND,缓存溢出时挂起;另外还有DROP_OLDEST与DROP_LATEST,分别是溢出时删除缓冲区中最旧的值,将新值添加到缓冲区,不要挂起。与在缓冲区溢出时删除当前添加到缓冲区的最新值(以便缓冲区内容保持不变),不要挂起。)

下面看几个使用的例子🙆‍♀️

  • 默认情况下,SharedFlow没有粘性事件。如下所示:🙆‍♀️

        var v = 0
        val sharedFlow = MutableSharedFlow<Int>()
        runBlocking 
            sharedFlow.emit(++v)
            val job = sharedFlow.onEach 
                log("receiver$it")
            .launchIn(this)
            delay(5000)
            job.cancel()
            log("end")
        
        //输出
        20:38:19:818 [main] end
    

    因为默认情况下,replay的值为0,也就是说在订阅的时候,将会有0个值回播给订阅者,所以就没有粘性事件了!

  • 再看一下正常使用的情况

        val sharedFlow = MutableSharedFlow<Int>()
        val value = AtomicInteger(0)
        runBlocking 
            GlobalScope.launch 
                sharedFlow.collect 
                    log("receiver value $it")
                
            
            GlobalScope.launch 
                delay(3000)
                sharedFlow.collect 
                    log("receiver2 value $it")
                
            
            while (true) 
                delay(1000)
                sharedFlow.emit(value.incrementAndGet())
            
        
        //结果
    20:55:34:565 [DefaultDispatcher-worker-2] receiver value 1
    20:55:35:552 [DefaultDispatcher-worker-2] receiver value 2
    20:55:36:560 [DefaultDispatcher-worker-2] receiver value 3
    20:55:36:560 [DefaultDispatcher-worker-1] receiver2 value 3
    20:55:37:570 [DefaultDispatcher-worker-1] receiver value 4
    20:55:37:570 [DefaultDispatcher-worker-2] receiver2 value 4
    ...
    

    默认的使用除了粘性事件之外,其他的和StateFlow就没有什么区别了。所以如果为了解决粘性事件的问题,可以使用SharedFlow。但是注意一点:SharedFlow是不防抖的

  • SharedFlow默认是要等到订阅者全部接收到并且处理完成之后,才会进行下一次发送,否则就是挂起

    var value = 0
        val sharedFlow = MutableSharedFlow<Int>()
        runBlocking 
            sharedFlow.onEach 
                delay(3000)
                log("receiver1:$it")
            .launchIn(GlobalScope)
            sharedFlow.onEach 
                delay(1000)
                log("receiver2:$it")
            .launchIn(GlobalScope)
            while (true) 
                val v = ++value
                log("send:$v")
                sharedFlow.emit(v)
                delay(1000)
            
        
        //输出
    22:35:45:787 [main] send:1
    22:35:46:822 [main] send:2
    22:35:46:822 [DefaultDispatcher-worker-2] receiver2:1
    22:35:47:833 [DefaultDispatcher-worker-2] receiver2:2
    22:35:48:823 [DefaultDispatcher-worker-2] receiver1:1
    22:35:49:829 [main] send:3
    22:35:50:834 [DefaultDispatcher-worker-2] receiver2:3
    22:35:51:825 [DefaultDispatcher-worker-2] receiver1:2
    22:35:52:837 [main] send:4
    22:35:53:843 [DefaultDispatcher-worker-2] receiver2:4
    22:35:54:832 [DefaultDispatcher-worker-2] receiver1:3
    22:35:55:838 [main] send:5
    ...
    

    可以看得出,我们仅仅设置send的延迟为1秒钟,但是因为第二个订阅者模拟处理时间为3秒钟,所以emit会等最后一个处理完成才会进行下一次的发射,可以看到从send:2以后,每次发射都间隔三秒钟了,保证所有的订阅者全部接收到数据。如果是StateFlow的话,是不管订阅者是否处理完成的,依旧会保持值的替换

  • 其中一个订阅者出现异常怎么办呢

    这个就不放代码了,异常的传播机制和协程的异常传播机制还是一样的。如果订阅者和发送的在同一个作用域则异常传播机制同协程作用域。如果和发送者分别在不同的作用域则订阅者出现异常,影响不到别的订阅者和发送的协程。

  • SharedFlow同一个作用域下丢失值的问题,和启动协程有关,这里简单介绍一下什么情况下会出现值丢失的情况,下一篇会详细分析协程的恢复与挂起实现原理,会讲到具体的原因,可以关注博主

        var value = 0
        val sharedFlow = MutableSharedFlow<Int>()
        runBlocking 
            sharedFlow.onEach 
                log("receiver1:$it")
            .launchIn(this)
            while (true) 
                val v = ++value
                log("send:$v")
                sharedFlow.emit(v)
                delay(1000)
            
        
        //结果
    09:27:08:434 [main] send:1
    09:27:09:471 [main] send:2
    09:27:09:471 [main] receiver1:2
    09:27:10:477 [main] send:3
    09:27:10:477 [main] receiver1:3
    09:27:11:493 [main] send:4
    09:27:11:493 [main] receiver1:4
    ...
    

    显而易见,第一个值没有被接受到,这是为什么呢?我查看了一下源码,按照顺序执行下来的话,在相同的作用域下启动协程,如果下面不挂起的话,上面的协程是不会启动的,所以第一个值就没有被发送了(SharedFlow在无订阅者的时候会丢失值);如果launchIn的作用域不是当前的作用域的话,也就是可以单独设置一个GlobalScope的话,此时就是没有问题的,所以上面的情况需要注意下一下。

  • 如果需要回播就值给新的订阅者,可以设置replay

        var value = 0
        val sharedFlow = MutableSharedFlow<Int>(replay = 4)
        runBlocking 
            launch 
                sharedFlow.collect 
                    log("receiver1:$it")
                
            
            launch 
                delay(2000)
                sharedFlow.collect 
                    log("receiver2:$it")
                
            
            launch 
                delay(4000)
                sharedFlow.collect 
                    log("receiver3:$it")
                
            
            while (true) 
                val v = ++value
                log("send:$v")
                sharedFlow.emit(v)
                delay(1000)
            
        
    //结果
    11:17:13:977 [main] send:1
    11:17:14:002 [main] receiver1:1
    11:17:15:009 [main] send:2
    11:17:15:009 [main] receiver1:2
    11:17:16:019 [main] receiver2:1
    11:深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow

    深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow

    Kotlin Flow 冷流 StateFlow 热流 StateFlow 的应用

    onEach 更改 StateFlow 中的调度程序(kotlin 协程)

    收集 StateFlow 时 Api 没有响应不会更改 kotlin

    Kotlin上的反应式流-SharedFlow和StateFlow