Kotlin 协程Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )
Posted 韩曙亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )相关的知识,希望对你有一定的参考价值。
文章目录
一、背压概念
" 背压 " 概念 指的是 数据 受到 与 流动方向 一致的压力 ,
数据 生产者 的 生产效率 大于 数据 消费者 的 消费效率 , 就会产生 背压 ;
处理背压问题 , 有 2 种方案 :
- 降低 数据 生产者 的生产效率 ;
- 提高 数据 消费者 的消费效率 ;
背压代码示例 : 以 100 ms间隔发射元素 , 以 200 ms 间隔收集元素 , 发射元素的效率 高于 收集元素的效率, 此时会产生背压 ;
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking
val delta = measureTimeMillis
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().collect
delay(200)
println("收集元素 $it , 当前线程 $Thread.currentThread().name")
println("收集元素耗时 $delta ms")
suspend fun flowEmit() = flow<Int>
// 以 100 ms 的间隔发射元素
for (i in 0..5)
delay(100)
emit(i)
println("发射元素 $i , 当前线程 $Thread.currentThread().name")
执行结果 : 收集元素的耗时总共耗费了 2284 ms ;
23:37:49.496 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:37:49.496 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:37:49.878 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:37:49.879 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:37:50.259 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:37:50.259 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:37:50.600 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:37:50.600 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:37:50.973 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:37:50.973 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:37:51.352 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:37:51.353 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:37:51.353 System.out kim.hsl.coroutine I 收集元素耗时 2284 ms
二、使用缓冲处理背压问题
调用 Flow#buffer 函数 , 为 收集元素 添加一个缓冲 , 可以指定缓冲区个数 ;
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking
val delta = measureTimeMillis
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().buffer(10).collect
delay(200)
println("收集元素 $it , 当前线程 $Thread.currentThread().name")
println("收集元素耗时 $delta ms")
suspend fun flowEmit() = flow<Int>
// 以 100 ms 的间隔发射元素
for (i in 0..5)
delay(100)
emit(i)
println("发射元素 $i , 当前线程 $Thread.currentThread().name")
执行结果 : 发射元素后 , 将发射的元素缓存起来 , 然后慢慢接收元素 ;
23:39:41.401 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:39:41.543 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:39:41.644 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:39:41.646 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:39:41.760 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:39:41.877 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:39:41.879 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:39:42.022 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:39:42.120 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:39:42.364 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:39:42.572 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:39:42.814 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:39:42.821 System.out kim.hsl.coroutine I 收集元素耗时 1601 ms
三、使用 flowOn 处理背压问题
上述 发射元素 和 收集元素 都是在同一个线程中执行的 , 这两个操作可以并行执行 , 即使用 flowOn 指定收集元素的线程 ;
使用 flowOn 更改了协程上下文 , 使得 发射元素 与 收集元素 在不同的线程中并行执行 ;
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking
val delta = measureTimeMillis
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().flowOn(Dispatchers.Default).collect
delay(200)
println("收集元素 $it , 当前线程 $Thread.currentThread().name")
println("收集元素耗时 $delta ms")
suspend fun flowEmit() = flow<Int>
// 以 100 ms 的间隔发射元素
for (i in 0..5)
delay(100)
emit(i)
println("发射元素 $i , 当前线程 $Thread.currentThread().name")
执行结果 :
23:45:19.675 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 DefaultDispatcher-worker-1
23:45:19.817 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 DefaultDispatcher-worker-1
23:45:19.918 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:45:19.921 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 DefaultDispatcher-worker-1
23:45:20.046 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 DefaultDispatcher-worker-1
23:45:20.124 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:45:20.186 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 DefaultDispatcher-worker-1
23:45:20.292 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 DefaultDispatcher-worker-1
23:45:20.333 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:45:20.548 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:45:20.790 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:45:21.000 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:45:21.007 System.out kim.hsl.coroutine I 收集元素耗时 1507 ms
四、从提高收集元素效率方向解决背压问题
从提高收集元素效率方向解决背压问题 :
- 调用 Flow#conflate 函数 , 合并发射元素项 , 不对每个值进行单独处理 ;
- 调用 Flow#collectLatest 函数 , 取消并重新发射最后一个元素 , 只关心最后一个结果 , 不关心中间的过程值 ;
1、Flow#conflate 代码示例
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking
val delta = measureTimeMillis
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().conflate().collect
delay(200)
println("收集元素 $it , 当前线程 $Thread.currentThread().name")
println("收集元素耗时 $delta ms")
suspend fun flowEmit() = flow<Int>
// 以 100 ms 的间隔发射元素
for (i in 0..5)
delay(100)
emit(i)
println("发射元素 $i , 当前线程 $Thread.currentThread().name")
执行结果 : 发射了 6 个元素 , 但是只接收到了 5 个元素 , 元素 2 被过滤掉了 ;
23:49:21.720 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:49:21.855 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:49:21.924 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:49:21.992 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:49:22.129 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:49:22.130 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:49:22.270 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:49:22.333 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:49:22.374 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:49:22.564 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:49:22.805 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:49:22.814 System.out kim.hsl.coroutine I 收集元素耗时 1277 ms
2、Flow#collectLatest 代码示例
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking
val delta = measureTimeMillis
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().collectLatest
delay(200)
println("收集元素 $it , 当前线程 $Thread.currentThread().name")
println("收集元素耗时 $delta ms")
suspend fun flowEmit() = flow<Int>
// 以 100 ms 的间隔发射元素
for (i in 0..5)
delay(100)
emit(i)
println("发射元素 $i , 当前线程 $Thread.currentThread().name")
执行结果 : 只接收了最后一个元素 , 前几个元素没有接收 ;
23:53:01.328 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:53:01.461 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:53:01.603 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:53:01.712 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:53:01.857 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
以上是关于Kotlin 协程Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )
Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )
Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )
Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )
Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )
Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )