Kotlin flow实践总结

Posted 嘴巴吃糖了

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin flow实践总结相关的知识,希望对你有一定的参考价值。

背景

最近学了下Kotlin Flow,顺便在项目中进行了实践,做一下总结。

Flow是什么

按顺序发出多个值的数据流。
本质就是一个生产者消费者模型,生产者发送数据给消费者进行消费。

  • 冷流:当执行collect的时候(也就是有消费者的时候),生产者才开始发射数据流。
    生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。
  • 热流:不管有没有执行collect(也就是不管有没有消费者),生产者都会发射数据流到内存中。
    生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据

实践场景

场景一:简单列表数据的加载状态

简单的列表显示场景,可以使用onStart,onEmpty,catch,onCompletion等回调操作符,监听数据流的状态,显示相应的加载状态UI。

  • onStart:在数据发射之前触发,onStart所在的线程,是数据产生的线程
  • onCompletion:在数据流结束时触发,onCompletion所在的线程,是数据产生的线程
  • onEmpty:当数据流结束了,缺没有发出任何元素的时候触发。
  • catch:数据流发生错误的时候触发
  • flowOn:指定上游数据流的CoroutineContext,下游数据流不会受到影响
private fun coldFlowDemo() 
    //创建一个冷流,在3秒后发射一个数据
    val coldFlow = flow<Int> 
        delay(3000)
        emit(1)
    
    lifecycleScope.launch(Dispatchers.IO) 
        coldFlow.onStart 
            Log.d(TAG, "coldFlow onStart, thread:$Thread.currentThread().name")
            mBinding.progressBar.isVisible = true
            mBinding.tvLoadingStatus.text = "加载中"
        .onEmpty 
            Log.d(TAG, "coldFlow onEmpty, thread:$Thread.currentThread().name")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "数据加载为空"
        .catch 
            Log.d(TAG, "coldFlow catch, thread:$Thread.currentThread().name")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "数据加载错误:$it"
        .onCompletion 
            Log.d(TAG, "coldFlow onCompletion, thread:$Thread.currentThread().name")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "加载完成"
        
            //指定上游数据流的CoroutineContext,下游数据流不会受到影响
            .flowOn(Dispatchers.Main)
            .collect 
                Log.d(TAG, "coldFlow collect:$it, thread:$Thread.currentThread().name")
            
    

比如上面的例子。 使用flow构建起函数,创建一个冷流,3秒后发送一个值到数据流中。 使用onStart,onEmpty,catch,onCompletion操作符,监听数据流的状态。

日志输出:

coldFlow onStart, thread:main
coldFlow onCompletion, thread:main
coldFlow collect:1, thread:DefaultDispatcher-worker-1

场景二:同一种数据,需要加载本地数据和网络数据

在实际的开发场景中,经常会将一些网络数据保存到本地,下次加载数据的时候,优先使用本地数据,再使用网络数据。
但是本地数据和网络数据的加载完成时机不一样,所以可能会有下面几种场景。

  1. 本地数据比网络数据先加载完成:那先使用本地数据,再使用网络数据
  2. 网络数据比本地数据先加载完成:
  • 网络数据加载成功,那只使用网络数据即可,不需要再使用本地数据了。
  • 网络数据加载失败,可以继续尝试使用本地数据进行兜底。
  1. 本地数据和网络数据都加载失败:通知上层数据加载失败

实现CacheRepositity

将上面的逻辑进行简单封装成一个基类,CacheRepositity。
相应的子类,只需要实现两个方法即可。

  • CResult:代表加载结果,Success 或者 Error。
  • fetchDataFromLocal(),实现本地数据读取的逻辑
  • fetchDataFromNetWork(),实现网络数据获取的逻辑
abstract class CacheRepositity<T> 
    private val TAG = "CacheRepositity"

    fun getData() = channelFlow<CResult<T>> 
        supervisorScope 
            val dataFromLocalDeffer = async 
                fetchDataFromLocal().also 
                    Log.d(TAG,"fetchDataFromLocal result:$it , thread:$Thread.currentThread().name")
                    //本地数据加载成功  
                    if (it is CResult.Success) 
                        send(it)
                    
                
            

            val dataFromNetDeffer = async 
                fetchDataFromNetWork().also 
                    Log.d(TAG,"fetchDataFromNetWork result:$it , thread:$Thread.currentThread().name")
                    //网络数据加载成功  
                    if (it is CResult.Success) 
                        send(it)
                        //如果网络数据已加载,可以直接取消任务,就不需要处理本地数据了
                        dataFromLocalDeffer.cancel()
                    
                
            

            //本地数据和网络数据,都加载失败的情况
            val localData = dataFromLocalDeffer.await()
            val networkData = dataFromNetDeffer.await()
            if (localData is CResult.Error && networkData is CResult.Error) 
                send(CResult.Error(Throwable("load data error")))
            
        
    

    protected abstract suspend fun fetchDataFromLocal(): CResult<T>

    protected abstract suspend fun fetchDataFromNetWork(): CResult<T>



sealed class CResult<out R> 
    data class Success<out T>(val data: T) : CResult<T>()
    data class Error(val throwable: Throwable) : CResult<Nothing>()

测试验证

写个TestRepositity,实现CacheRepositity的抽象方法。
通过delay延迟耗时来模拟各种场景,观察日志的输出顺序。

private fun cacheRepositityDemo()
    val repositity=TestRepositity()
    lifecycleScope.launch 
        repositity.getData().onStart 
            Log.d(TAG, "TestRepositity: onStart")
        .onCompletion 
            Log.d(TAG, "TestRepositity: onCompletion")
        .collect 
            Log.d(TAG, "collect: $it")
        
    

本地数据比网络数据加载快

class TestRepositity : CacheRepositity<String>() 
    override suspend fun fetchDataFromLocal(): CResult<String> 
        delay(1000)
        return CResult.Success("data from fetchDataFromLocal")
    

    override suspend fun fetchDataFromNetWork(): CResult<String> 
        delay(2000)
        return CResult.Success("data from fetchDataFromNetWork")
    

模拟数据:本地加载delay1秒,网络加载delay2秒
日志输出:collect 执行两次,先收到本地数据,再收到网络数据。

onStart
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion

网络数据比本地数据加载快

class TestRepositity : CacheRepositity<String>() 
    override suspend fun fetchDataFromLocal(): CResult<String> 
        delay(2000)
        return CResult.Success("data from fetchDataFromLocal")
    

    override suspend fun fetchDataFromNetWork(): CResult<String> 
        delay(1000)
        return CResult.Success("data from fetchDataFromNetWork")
    

模拟数据:本地加载delay 2秒,网络加载delay 1秒
日志输出:collect 只执行1次,只收到网络数据。

onStart
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion

网络数据加载失败,使用本地数据

class TestRepositity : CacheRepositity<String>() 
    override suspend fun fetchDataFromLocal(): CResult<String> 
        delay(2000)
        return CResult.Success("data from fetchDataFromLocal")
    

    override suspend fun fetchDataFromNetWork(): CResult<String> 
        delay(1000)
        return CResult.Error(Throwable("fetchDataFromNetWork Error"))
    

模拟数据:本地加载delay 2秒,网络数据加载失败
日志输出:collect 只执行1次,只收到本地数据。

onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
onCompletion

网络数据和本地数据都加载失败

class TestRepositity : CacheRepositity<String>() 
    override suspend fun fetchDataFromLocal(): CResult<String> 
        delay(2000)
        return CResult.Error(Throwable("fetchDataFromLocal Error"))
    

    override suspend fun fetchDataFromNetWork(): CResult<String> 
        delay(1000)
        return CResult.Error(Throwable("fetchDataFromNetWork Error"))
    

模拟数据:本地数据加载失败,网络数据加载失败
日志输出: collect 只执行1次,结果是CResult.Error,代表加载数据失败。

onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Error(throwable=java.lang.Throwable: fetchDataFromLocal Error) , thread:main
collect: Error(throwable=java.lang.Throwable: load data error)
onCompletion

场景三:多种数据源,按照顺序合并进行展示

在实际的开发场景中,经常一个页面的数据,是需要发起多个网络请求之后,组合数据之后再进行显示。 比如类似这种页面,3种数据,需要由3个网络请求获取得到,然后再进行相应的显示。

实现目标:

  1. 接口间不需要互相等待,哪些数据先回来,就先展示哪部分
  2. 控制数据的显示顺序

flow combine操作符

可以合并多个不同的 Flow 数据流,生成一个新的流。 只要其中某个子 Flow 数据流有产生新数据的时候,就会触发 combine 操作,进行重新计算,生成一个新的数据。

例子

class HomeViewModel : ViewModel() 

    //暴露给View层的列表数据
    val list = MutableLiveData<List<String?>>()

    //多个子Flow,这里简单都返回String,实际场景根据需要,返回相应的数据类型即可
    private val bannerFlow = MutableStateFlow<String?>(null)
    private val channelFlow = MutableStateFlow<String?>(null)
    private val listFlow = MutableStateFlow<String?>(null)


    init 
        //使用combine操作符
        viewModelScope.launch 
            combine(bannerFlow, channelFlow, listFlow)  bannerData, channelData, listData ->
                Log.d("HomeViewModel", "combine  bannerData:$bannerData,channelData:$channelData,listData:$listData")
                //只要子flow里面的数据不为空,就放到resultList里面
                val resultList = mutableListOf<String?>()
                if (bannerData != null) 
                    resultList.add(bannerData)
                
                if (channelData != null) 
                    resultList.add(channelData)
                
                if (listData != null) 
                    resultList.add(listData)
                
                resultList
            .collect 
                //收集combine之后的数据,修改liveData的值,通知UI层刷新列表
                Log.d("HomeViewModel", "collect: $it.size")
                list.postValue(it)
            
        
    

    fun loadData() 
        viewModelScope.launch(Dispatchers.IO) 
            //模拟耗时操作
            async 
                delay(1000)
                Log.d("HomeViewModel", "getBannerData success")
                bannerFlow.emit("Banner")
            
            async 
                delay(2000)
                Log.d("HomeViewModel", "getChannelData success")
                channelFlow.emit("Channel")
            
            async 
                delay(3000)
                Log.d("HomeViewModel", "getListData success")
                listFlow.emit("List")
            
        
    

HomeViewModel

  1. 提供一个 LiveData 的列表数据给View层使用
  2. 内部有3个子 flow ,分别负责相应数据的生产。(这里简单都返回String,实际场景根据需要,返回相应的数据类型即可)。
  3. 通过 combine 操作符,组合这3个子flow的数据。
  4. collect 接收生成的新数据,并修改liveData的数据,通知刷新UI

View层使用

private fun flowCombineDemo() 
    val homeViewModel by viewModels<HomeViewModel>()
    homeViewModel.list.observe(this) 
        Log.d("HomeViewModel", "observe size:$it.size")
    
    homeViewModel.loadData()

简单的创建一个 ViewModel ,observe 列表数据对应的 LiveData。
通过输出的日志发现,触发数据加载之后,每次子 Flow 流生产数据的时候,都会触发一次 combine 操作,生成新的数据。

日志输出:
combine  bannerData:null,channelData:null,listData:null
collect: 0
observe size:0

getBannerData success
combine  bannerData:Banner,channelData:null,listData:null
collect: 1
observe size:1

getChannelData success
combine  bannerData:Banner,channelData:Channel,listData:null
collect: 2
observe size:2

getListData success
combine  bannerData:Banner,channelData:Channel,listData:List
collect: 3
observe size:3

总结

具体场景,具体分析。刚好这几个场景,配合Flow进行使用,整体实现也相对简单了一些。

以上是关于Kotlin flow实践总结的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )

Kotlin Flow 冷流 StateFlow 热流 StateFlow 的应用

android kotlin 协程 源码浅析

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Flow 操作符 shareIn 和 stateIn 使用须知