深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题
Posted 川峰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题相关的知识,希望对你有一定的参考价值。
热数据通道 Channel
Channel 实际上就是 个并发安全的队列,它可以用来连接协程,实现不同协程的通信,代码如代码清单所示
suspend fun testChannel()
val channel = Channel<Int>()
var i = 0
//生产者 发
val producer = GlobalScope.launch
while (true)
delay(1000)
channel.send(i++)
//消费者 收
val consumer = GlobalScope.launch
while (true)
val value = channel.receive()
println("received <<<<<<<<<<<<<<<<<< $value")
producer.join()
consumer.join()
上述代码 构造了两个协程 producer 和 consumer, 没有为它们明确指定调度器,所以它们都是采用默认调度器,在 Java 平台上就是基于线程池实现的 Default。 它们可以运行在不同的线程上,也可以运行在同一个线程上,具体执行流程如图 6-2 所示。
producer 每隔 1s 向 Channel 发送 1 个数,consumer 一直读取 channel 来获取这个数字并打印,显然发送端比接收端更慢,在没有值可以读到的时候, receive 是挂起的,直到有新元素到达。
这么看来,receive 一定是一个挂起函数,那么 send 呢?
你会发现 send 也是挂起函数。发送端为什么会挂起?以我们熟知的 BlockingQueue 为例,当我们往其中添加元素的时候,元素在队列里实际上是占用了空间的,如果这个队列空间不足,那么再往其中添加元素的时候就会出现两种情况:
- 阻塞.等待队列腾出空间
- 异常,拒绝添加元素。
send 也会面临同样的问题, Channel 实际上就是个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用 receive 并取走元素, send 就需要挂起,等待接收者取走数据之后再写入 Channel 。
Channel缓冲区
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity)
RENDEZVOUS ->
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
CONFLATED ->
require(onBufferOverflow == BufferOverflow.SUSPEND)
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
ConflatedChannel(onUndeliveredElement)
UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
onBufferOverflow, onUndeliveredElement
)
else ->
if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
else
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
我们构造 Channel 的时候调用了一个名为 Channel 的函数,虽然两个 “Channel" 起来是 样的,但它却确实不是 Channel 的构造函数。在 Kotlin 中我们经常定义 一个顶级函数来伪装成同名类型的构造器,这本质上就是工厂函数。Channel 函数有一个参数叫 capacity, 该参数用于指定缓冲区的容量,RENDEZVOUS 默认值为 0,RENDEZVOUS 本意就是描述“不见不散"的场景, 如果不调用 receive, send 就会一直挂起等待。如果把上面代码中consumer的channel.receive()注释掉,则producer中send方法第一次调用就会挂起。
- Channel(Channel.RENDEZVOUS ) 的方式是有人接收才会继续发,边收边发,如果没有接受的,则发送者会挂起等待。
- Channel(Channel.UNLIMITED ) 的方式是发送者发送完毕,就直接返回,不管有没有接受者。
- Channel(Channel.CONFLATED ) 的方式是不管发送者发了多少个,接受者只能收到最后一个,也是发送完就返回了,不管有没有接受者。
- Channel(Channel.BUFFERED ) 的方式也是发送者发送完就返回了,不管有没有接受者,可以指定buffer大小。
- Channel(1) 的方式指定管道的容量大小,如果数据超过容量,发送者就会挂起等待,直到有接受者取走数据,发送者才发送下一批数据。
Channel的迭代
Channel可以通过迭代器迭代访问:
GlobalScope.launch
val iterator = channel.iterator()
while (iterator.hasNext()) // 挂起点
println("received <<<<<<<<<<<<<<<<<< $iterator.next()")
其中,iterator.hasNext() 是挂起函数,在判断是否有下个元素的时候就需要去Channel 中读取元素了,这个写法自然可以简化成 for-in。
GlobalScope.launch
for (element in channel)
println("received <<<<<<<<<<<<<<<<<< $element")
生产者和消费者协程构造器
我们可以通过 produce 方法启动一个生产者协程,并返回 ReceiveChannel,其他协程就可以用这个 Channel 来接收数据了。反过来,我们可以用 actor 启动一个消费者协程。
suspend fun producer()
val receiveChannel = GlobalScope.produce
for (i in 0..3)
send(i)
println("send --------------> $i")
val consumer = GlobalScope.launch
for (i in receiveChannel)
println("received <<<<<<<<<<<<<<<< $i")
consumer.join()
suspend fun consumer()
val sendChannel = GlobalScope.actor<Int>
for (i in this)
println("received <<<<<<<<<<<<<<<< $i")
val producer = GlobalScope.launch
for (i in 0..3)
sendChannel.send(i)
println("send --------------> $i")
producer.join()
使用这两种构造器也可以指定Channel对应的缓冲区类型,如:
val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED)
for (i in 0..3)
send(i)
ReceiveChannel SendChannel 都是 Channel 的父接口,前者定义了 receive, 后者定义了 send, Channel 也因此既可以使用 receive 又可以使用 send。
通过 produce 和 actor 这两个协程构造器启动的协程也与返回的 Channel 自然地绑定到了一起,因此在协程结束时返回的 Channel 也会被立即关闭。
以 produce 为例,它构造出了一个 ProducerCoroutine 对象,该对象也是 Job 的实现:
private class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E>
override val isActive: Boolean
get() = super.isActive
override fun onCompleted(value: Unit)
_channel.close() // 协程完成时关闭channel
override fun onCancelled(cause: Throwable, handled: Boolean)
val processed = _channel.close(cause) // 协程取消时关闭channel
if (!processed && !handled) handleCoroutineException(context, cause)
注意,在协程完成和取消的方法调用 中, 对应的_channel 都会被关闭。produc actor 这两个构造器看上去都很有用,不过目前前者仍被标记为 Experimental CoroutinesApi, 后者则被标记为 ObsoleteCoroutinesApi, 后续仍然可能会有较大的改动。
Channel的关闭
对千一个 Channel 如果我们调用了它的 close() 方法,它会立即停止接收新元素,也就是说这时候它的 isClosedForSend 会立即返回 true 而由于 Channel 缓冲区的存在, 这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。
一说到关闭,我们很容易想到 I/0, 如果不关闭 1/0 可能会造成资源泄露。那么 Channel 关闭有什么意义呢?前面我们提到过,Channel 内部的资源其实就是个缓冲区,如果我们创建 Channel 而不去关闭它。虽然并不会造成系统资源的泄露,但却会让接收端一直处千挂起等待的状态,因此一定要在适当的时机关闭 Channel。
究竟由谁来关闭Channel,需要根据业务场景由发送端和接受端之间进行协商决定。如果发送端关闭了Channel,接受端还在调用receive方法,会导致异常,这时就需要进行异常处理:
suspend fun testChannel2()
val channel = Channel<Int>()
//生产者 发
val producer = GlobalScope.launch
for (i in 0..3)
println("sending --------------> $i")
channel.send(i)
channel.close() // 发送端关闭channel
//消费者 收
val consumer = GlobalScope.launch
try
while (true)
val value = channel.receive()
// val value = channel.receiveCatching() // 这个方法不会抛出异常
println("received <<<<<<<<<<<<<<<<<< $value")
catch (e : ClosedReceiveChannelException)
println("catch ClosedReceiveChannelException: $e.message")
producer.join()
consumer.join()
发送端关闭了Channel,接受端还在调用receive方法,会抛出ClosedReceiveChannelException异常,如果使用receiveCatching()遇到close时就不会抛出异常,但是会使用null作为返回结果。
BroadcastChannel
创建 broadcastCbannel 的方法与创建普通的 Channel 几乎没有区别:
val broadcastChannel = broadcastChannel<Int>(5)
如果要订阅功能,那么只 要调用如下方法
val receiveChannel = broadcastChannel.openSubscription()
这样我们就得到了一个 ReceiveChannel,如果想要想获取订阅的消息,只需要调用它的 receive 函数;如果想要取消订阅则调用 cancel 函数即可。
我们来看一个比较完整的例子,本示例中我们在发送端发 0 1 2, 并启动 3个协程同时接收广播,相关代码如下所示。
suspend fun broadcast()
//下面几种都可以创建一个BroadcastChannel
//val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
//val broadcastChannel = Channel<Int>(Channel.BUFFERED).broadcast()
val broadcastChannel = GlobalScope.broadcast
for (i in 0..2)
send(i)
//启动3个子协程作为接受者,每个都能收到
List(3) index ->
GlobalScope.launch
val receiveChannel = broadcastChannel.openSubscription() // 订阅
for (i in receiveChannel)
println("[#$index] received: $i")
.joinAll()
除了直接创建以外,我们也可以用前面定义的普通 Channel 进行转换,代码如下所示。
// 通过 Channel 实例直接创建广播
val channel = Channel<Int>()
val broadcastChannel = channel.broadcast()
Channel 版本的序列生成器
// 使用channel模拟序列生成器
val channel = GlobalScope.produce
println("A")
send(1)
println("B")
send(2)
println("Done")
for (item in channel)
println("get $item")
冷数据流Flow
Sequence中不能调用其他挂起函数,不能设置调度器,只能单线程中使用。而Flow可以支持:
// 序列生成器中不能调用其他挂起函数
sequence
(1..3).forEach
yield(it)
delay(100) // ERROR
创建Flow
val intFlow = flow
(1..3).forEach
emit(it)
delay(100)
Flow 也可以设定它运行时所使用的调度器:
intFlow.flowDn(Dispatchers.IO)
通过 flowOn 设置的调度器只对它之前的操作有影响,因此这里意味着 intFlow 的构造逻辑会在 IO 调度器上执行。
最终读取 intFlow 需要调用 collect 函数, 这个函数也是一个挂起函数。我们启动一个协程来消费 intFlow, 代码如下所示
suspend fun testFlows()
val dispatcher = Executors.newSingleThreadExecutor
Thread(it, "MyThread").also it.isDaemon = true
.asCoroutineDispatcher()
GlobalScope.launch(dispatcher)
val intFlow = flow
(1..3).forEach
emit(it)
println("$Thread.currentThread().name: emit $it")
delay(1000)
intFlow.flowOn(Dispatchers.IO)
.collect
println("$Thread.currentThread().name: collect $it")
.join()
为了方便区分,我们为协程设置了一个自定义的调度器,它会将协程调度到名叫 MyThread 的线程上,结果如下:
也就是说,collect中的代码运行在Global.launch指定的调度器上,flow… 中的代码运行在 flowOn 指定的调度器上。
对比 RxJava 的线程切换
RxJava 也是 个基千响应式编程模型的异步框架,它提供了两个切换调度器的 API, 分别是 subscribeOn observeOn, 其中 subscribeOn 指定的调度器执行被观察者的代码, observeOn 指定调度器运行观察者的代码,观察者最后在observeOn 指定调度器上收集结果。flowOn 就相当于subscribeOn,而 launch的调度器 就相当于 observeOn 指定的调度器。
在一个 Flow 创建出来之后,不消费则不生产,多次消费则多次生产,生产和消费总是相对应的,代码如下所示。
GlobalScope.launch(dispatcher)
val intFlow = flow
(1..3).forEach
emit(it)
delay(1000)
// Flow 可以被重复消费
intFlow.collect println(it)
intFlow.collect println(it)
.join()
消费它会输出 “1, 2, 3", 重复消费它会重复输出“1, 2, 3"。
这一点类似于我们前面提到的序列生成器和 RxJava 的例子,它们也都有自己的消费端。我们创建序列后去迭代它,每次迭代都会创建一个新的迭代器从头开始迭代RxJava Observable 也是如此,每次调用它的 subscribe 都会重新消费一次。
所谓冷数据流,就是只有消费时才会生 的数据流,这一点 Channel 正好相反,Channel 发送端并不依赖于接收端。
异常处理
Flow 的异常处理也比较 接,直接调用 catch 函数即可,如下所示。
如果想要在 Flow 完成时执行逻辑,可以使用 onCompletion,onCompletion 用起来类似于 try… catch… finally 中的 finally, 无论前面是否存在异常,它都会被调用,参数 t 则是前面未捕获的异常。
这套处理机制的设计初衷是确保 Flow 操作中异常的透明。因此,直接使用 try - catch - finally 的写法是违反 Flow 的设计原则的。
suspend fun exception()
flow<Int>
emit(1)
throw ArithmeticException("Div 0")
.catch t: Throwable ->
log("caught error: $t")
.onCompletion t: Throwable? ->
log("finally.")
.flowOn(Dispatchers.IO)
.collect log(it)
// 不推荐直接使用 try - catch - finally 写法
// flow // bad!!!
// try
// emit(1)
// throw ArithmeticException("Div 0")
// catch (t: Throwable)
// log("caught error: $t")
// finally
// log("finally.")
//
//
我们在 Flow 操作内部使用 try… catch. … finally, 这样的写法后续可能会被禁用。
// Flow 从异常中恢复
flow
emit(1)
throw ArithmeticException("divide 0")
.catch t : Throwable ->
println("caught error $t")
emit(10)
这里我们可以使用 emit 重新生产新元素。细 心的读者一定会发现, emit 定义在 FlowCollector 中,因此只要遇到 Receiver 为 FlowCollector 的函数,我们就可以生产新元素。
末端操作符
前面的例子中,我们用 collect 消费 flow 的数据 collect 是最基本的末端操作符,功能与 RxJava 的 subscribe 类似。
除了 collect 之外,还有其他常见的末端操作符,它们大体分为两类:
- 集合类型转换操作符,包括 toList toSet
- 聚合操作符,包括将 flow 规约到单值的 reduce、fold 等操作;还有获得单个元素的操作符,包括 sing、singleOrNull、first 等。
实际上,识别是否为末端操作符 ,还有一个简单方法 :由于 Flow 的消费端一定需要运行在协程中 ,因此末端操作符都是挂起函数。
分离 Flow 的消费和触发
我们除了可以在 collect 处消费 Flow 的元素以外,还可以通过 onEach 来做到这一点。这样消费的具体操作就不需要与末端操作符放到一起, collect 函数可以放到其他任意位置调用,例如代码如下所示。
// 分类 Flow 的消费和触发
fun createFlow() = flow<Int>
(1..3).forEach
emit(it)
delay(100)
.onEach println(it)
fun main()
GlobalScope.launch
createFlow().collect()
由此,我们又可以衍生出一种新的消费 Flow 的写法,代码如下所示。
fun main()
// 使用协程作用域直接触发 Flow
createFlow().launchIn(GlobalScope)
其中, launchln 函数只接收 CoroutineScope 类型的参数。
Flow的取消
Flow 没有提供取消操作的方法,因为并不需要。
我们前面已经介绍了 Flow 的消费依赖于 collect 这样的末端操作符,而它们又必须在协程中调用,因此 Flow 的取消主要依赖于末端操作符所在的协程的状态。
以上是关于深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程协程中的多路复用技术 ② ( select 函数原型 | SelectClauseN 事件 | 查看挂起函数是否支持 select )
深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮
Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )
Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )
Kotlin 协程协程并发安全问题 ( 使用 Atomic 并发安全类型 | 使用 Channel 通道 | 使用 Mutext 轻量级锁 | 使用 Semaphore 轻量级信号量 )