Kotlin 协程Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )相关的知识,希望对你有一定的参考价值。

文章目录





一、背压概念



" 背压 " 概念 指的是 数据 受到 与 流动方向 一致的压力 ,

数据 生产者生产效率 大于 数据 消费者消费效率 , 就会产生 背压 ;


处理背压问题 , 有 2 种方案 :

  • 降低 数据 生产者 的生产效率 ;
  • 提高 数据 消费者 的消费效率 ;

背压代码示例 : 以 100 ms间隔发射元素 , 以 200 ms 间隔收集元素 , 发射元素的效率 高于 收集元素的效率, 此时会产生背压 ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking 
            val delta = measureTimeMillis 
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().collect 
                    delay(200)
                    println("收集元素 $it , 当前线程 $Thread.currentThread().name")
                
            
            println("收集元素耗时 $delta ms")
        
    

    suspend fun flowEmit() = flow<Int> 
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) 
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 $Thread.currentThread().name")
        
    

执行结果 : 收集元素的耗时总共耗费了 2284 ms ;

23:37:49.496 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:37:49.496 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:37:49.878 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:37:49.879 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:37:50.259 System.out   kim.hsl.coroutine     I  收集元素 2 , 当前线程 main
23:37:50.259 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:37:50.600 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:37:50.600 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:37:50.973 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:37:50.973 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
23:37:51.352 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:37:51.353 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 main
23:37:51.353 System.out   kim.hsl.coroutine     I  收集元素耗时 2284 ms





二、使用缓冲处理背压问题



调用 Flow#buffer 函数 , 为 收集元素 添加一个缓冲 , 可以指定缓冲区个数 ;


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking 
            val delta = measureTimeMillis 
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().buffer(10).collect 
                    delay(200)
                    println("收集元素 $it , 当前线程 $Thread.currentThread().name")
                
            
            println("收集元素耗时 $delta ms")
        
    

    suspend fun flowEmit() = flow<Int> 
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) 
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 $Thread.currentThread().name")
        
    

执行结果 : 发射元素后 , 将发射的元素缓存起来 , 然后慢慢接收元素 ;

23:39:41.401 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:39:41.543 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:39:41.644 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:39:41.646 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:39:41.760 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:39:41.877 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:39:41.879 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
23:39:42.022 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 main
23:39:42.120 System.out   kim.hsl.coroutine     I  收集元素 2 , 当前线程 main
23:39:42.364 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:39:42.572 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:39:42.814 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:39:42.821 System.out   kim.hsl.coroutine     I  收集元素耗时 1601 ms





三、使用 flowOn 处理背压问题



上述 发射元素 和 收集元素 都是在同一个线程中执行的 , 这两个操作可以并行执行 , 即使用 flowOn 指定收集元素的线程 ;

使用 flowOn 更改了协程上下文 , 使得 发射元素 与 收集元素 在不同的线程中并行执行 ;


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking 
            val delta = measureTimeMillis 
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().flowOn(Dispatchers.Default).collect 
                    delay(200)
                    println("收集元素 $it , 当前线程 $Thread.currentThread().name")
                
            
            println("收集元素耗时 $delta ms")
        
    

    suspend fun flowEmit() = flow<Int> 
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) 
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 $Thread.currentThread().name")
        
    

执行结果 :

23:45:19.675 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 DefaultDispatcher-worker-1
23:45:19.817 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 DefaultDispatcher-worker-1
23:45:19.918 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:45:19.921 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 DefaultDispatcher-worker-1
23:45:20.046 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 DefaultDispatcher-worker-1
23:45:20.124 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:45:20.186 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 DefaultDispatcher-worker-1
23:45:20.292 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 DefaultDispatcher-worker-1
23:45:20.333 System.out   kim.hsl.coroutine     I  收集元素 2 , 当前线程 main
23:45:20.548 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:45:20.790 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:45:21.000 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:45:21.007 System.out   kim.hsl.coroutine     I  收集元素耗时 1507 ms





四、从提高收集元素效率方向解决背压问题



从提高收集元素效率方向解决背压问题 :

  • 调用 Flow#conflate 函数 , 合并发射元素项 , 不对每个值进行单独处理 ;
  • 调用 Flow#collectLatest 函数 , 取消并重新发射最后一个元素 , 只关心最后一个结果 , 不关心中间的过程值 ;

1、Flow#conflate 代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking 
            val delta = measureTimeMillis 
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().conflate().collect 
                    delay(200)
                    println("收集元素 $it , 当前线程 $Thread.currentThread().name")
                
            
            println("收集元素耗时 $delta ms")
        
    

    suspend fun flowEmit() = flow<Int> 
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) 
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 $Thread.currentThread().name")
        
    

执行结果 : 发射了 6 个元素 , 但是只接收到了 5 个元素 , 元素 2 被过滤掉了 ;

23:49:21.720 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:49:21.855 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:49:21.924 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:49:21.992 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:49:22.129 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:49:22.130 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:49:22.270 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
23:49:22.333 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:49:22.374 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 main
23:49:22.564 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:49:22.805 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:49:22.814 System.out   kim.hsl.coroutine     I  收集元素耗时 1277 ms


2、Flow#collectLatest 代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking 
            val delta = measureTimeMillis 
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().collectLatest 
                    delay(200)
                    println("收集元素 $it , 当前线程 $Thread.currentThread().name")
                
            
            println("收集元素耗时 $delta ms")
        
    

    suspend fun flowEmit() = flow<Int> 
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) 
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 $Thread.currentThread().name")
        
    

执行结果 : 只接收了最后一个元素 , 前几个元素没有接收 ;

23:53:01.328 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:53:01.461 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:53:01.603 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:53:01.712 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:53:01.857 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
以上是关于Kotlin 协程Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )