Kotlin 协程Channel 通道 ③ ( CoroutineScope#produce 构造生产者协程 | CoroutineScope#actor 构造消费者协程 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Channel 通道 ③ ( CoroutineScope#produce 构造生产者协程 | CoroutineScope#actor 构造消费者协程 )相关的知识,希望对你有一定的参考价值。

文章目录





一、CoroutineScope#produce 构造生产者协程



通过 CoroutineScope#produce 函数 , 可以快速构造一个 生产者协程 , 其返回值是 ReceiveChannel 实例对象 , 这样就可以在消费者协程中通过该 ReceiveChannel 实例获取并消费数据 ;


1、CoroutineScope#produce 函数原型


CoroutineScope#produce 函数原型 :

/**
 * 启动一个新的协程,通过将值发送到通道来生成值流
 * 并返回对协程的引用作为[receichannnel]。这个结果
 * 对象可以用于[receive][receichannchannel .]接收此协程产生的]个元素。
 *
 * 协程的作用域包含[ProducerScope]接口,该接口实现
 * [CoroutineScope]和[SendChannel],这样协程就可以调用
 * [将][SendChannel。直接发送)。通道已关闭[SendChannel.close]
 * 当协程完成时。
 * 当其接收通道为[cancelled][receivecchannel .cancel]时,正在运行的协程将被取消。
 *
 * 协程上下文继承自这个[CoroutineScope]。可以使用[context]参数指定其他上下文元素。
 * 如果上下文没有任何dispatcher或其他[ContinuationInterceptor],则[Dispatchers. dispatcher]。使用“Default]”。
 * 父作业也继承自[CoroutineScope],但它也可以被重写
 * 使用相应的[context]元素。
 *
 * 此协程中任何未捕获的异常将以此异常作为原因和关闭通道
 * 结果通道将变成_failed_,因此此后任何试图从它接收的尝试都会抛出异常。
 *
 * 生成的通道类型取决于指定的[capacity]参数。
 * 详细信息请参见[Channel]接口文档。
 *
 * 请参阅[newCoroutineContext],以获得新创建的协程可用的调试工具的描述。
 *
 * **注意:这是一个实验性的api。**在父范围内作为孩子工作的制作人的行为
 * 取消和错误处理将来可能会更改。
 *
 * @param context   附加到[CoroutineScope。coroutineContext]协程的上下文。
 * @param capacity  通道缓冲区的容量(默认情况下没有缓冲区)。
 * @param block     协程代码。
 */
@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
    produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)

2、代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            runBlocking 

                val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce 
                    for (i in 0..3) 
                        delay(1000)
                        channel.send(i)
                        println("向通道中发送数据 $i")
                    
                

                // 数据消费者协程
                val consumer = GlobalScope.launch 
                    while (true) 
                        for(num in receiveChannel) 
                            println("从通道中获取数据 $num")
                        
                    
                
            
        
    

执行结果 :

22:35:52.720 System.out   kim.hsl.coroutine     I  向通道中发送数据 0
22:35:52.721 System.out   kim.hsl.coroutine     I  从通道中获取数据 0
22:35:53.764 System.out   kim.hsl.coroutine     I  向通道中发送数据 1
22:35:53.765 System.out   kim.hsl.coroutine     I  从通道中获取数据 1
22:35:54.786 System.out   kim.hsl.coroutine     I  向通道中发送数据 2
22:35:54.787 System.out   kim.hsl.coroutine     I  从通道中获取数据 2
22:35:55.835 System.out   kim.hsl.coroutine     I  从通道中获取数据 3
22:35:55.839 System.out   kim.hsl.coroutine     I  向通道中发送数据 3





二、CoroutineScope#actor 构造消费者协程



通过 CoroutineScope#actor 函数 , 可以快速构造一个 消费者协程 ;


1、CoroutineScope#actor 函数原型


CoroutineScope#actor 函数原型 :

/**
 * 启动从其邮箱通道接收消息的新协程
 * 并返回对其邮箱通道的引用作为[SendChannel]。由此产生的
 * 对象可以用来[发送][SendChannel。向这个协程发送]条消息。
 *
 * 协程的作用域包含[ActorScope]接口,该接口实现
 * [CoroutineScope]和[receichannnel],这样协程就可以调用
 * [接受][ReceiveChannel。直接接收)。通道已关闭[SendChannel.close]
 * 当协程完成时。
 *
 * 协程上下文继承自[CoroutineScope],可以使用[context]参数指定其他上下文元素。
 * 如果上下文既没有任何dispatcher,也没有任何其他[ContinuationInterceptor],那么[Dispatchers. dispatcher .]使用“Default]”。
 * 父作业也继承自[CoroutineScope],但它也可以被重写
 * 带有相应的[context]元素。
 *
 * 默认情况下,协程立即被安排执行。
 * 其他选项可以通过“start”参数指定。详见[coroutinstart]。
 * 可选参数[start]可设置为[coroutinstart]。启动协程_lazy。在这种情况下,
 * 它将在第一条消息上隐式启动
 * 【发送】【SendChannel。发送到此演员的邮箱通道。
 *
 * 此协程中未捕获的异常将以此异常作为原因和关闭通道
 * 结果通道变成_failed_,因此任何发送到该通道的尝试都会抛出异常。
 *
 * 生成的通道类型取决于指定的[capacity]参数。
 * 详见[Channel]接口文档。
 *
 * 参见[newCoroutineContext] [CoroutineScope。newCoroutineContext]用于描述可用于新创建的协程的调试工具。
 *
 * 使用演员
 *
 * 角色构建器的典型用法如下:
 *
 * ```
 * val c = actor 
 *     // initialize actor's state
 *     for (msg in channel) 
 *         // process message here
 *     
 * 
 * // send messages to the actor
 * c.send(...)
 * ...
 * // stop the actor when it is no longer needed
 * c.close()
 * ```
 *
 * ###停止并取消演员
 *
 * 当参与者的收件箱通道[关闭][SendChannel.]关闭]它向参与者发送一个特殊的“关闭令牌”。
 * 参与者仍然处理已经发送的所有消息,然后“' for (msg in channel) '”循环终止
 * 演员完成了。
 *
 * 如果需要在不处理已经发送给它的所有消息的情况下中止参与者,则
 * 它将与父job一起创建:
 *
 * ```
 * val job = Job()
 * val c = actor(context = job)   ... 
 * ...
 * // abort the actor
 * job.cancel()
 * ```
 *
 * 当演员的父工作被[取消][工作。取消],那么演员的工作就取消了。这意味着
 * " ' for (msg in channel) ' "和其他可取消的挂起函数抛出[CancellationException]和actor
 * 在不处理剩余消息的情况下完成。
 *
 * **注意:此API将在未来的更新中随着复杂角色的引入而过时
 * 参见[issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87)。
 *
 * @param context       附加到[CoroutineScope。coroutineContext]协程的上下文。
 * @param capacity      通道缓冲区的容量(默认情况下没有缓冲区)。
 * @param start         协程启动选项。缺省值为[coroutinstart . default]。
 * @param onCompletion  参与者协程的可选完成处理程序(参见[Job.invokeOnCompletion])
 * @param block         协程代码。
 */
@ObsoleteCoroutinesApi
public fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E> 
    val newContext = newCoroutineContext(context)
    val channel = Channel<E>(capacity)
    val coroutine = if (start.isLazy)
        LazyActorCoroutine(newContext, channel, block) else
        ActorCoroutine(newContext, channel, active = true)
    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
    coroutine.start(start, coroutine, block)
    return coroutine


2、代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            runBlocking 
                // 数据消费者协程
                val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> 
                    while (true) 
                        val num = receive()
                        println("从通道中获取数据 $num")
                    
                

                // 数据生产者协程
                val producer = GlobalScope.launch 
                    for (i in 0..3) 
                        sendChannel.send(i)
                        println("向通道中发送数据 $i")
                    
                
            
        
    

执行结果 :

22:43:12.093 System.out   kim.hsl.coroutine     I  从通道中获取数据 0
22:43:12.095 System.out   kim.hsl.coroutine     I  向通道中发送数据 0
22:43:12.096 System.out   kim.hsl.coroutine     I  向通道中发送数据 1
22:43:12.097 System.out   kim.hsl.coroutine     I  从通道中获取数据 1
22:43:12.098 System.out   kim.hsl.coroutine     I  从通道中获取数据 2
22:43:12.098 System.out   kim.hsl.coroutine     I  向通道中发送数据 2
22:43:12.099 System.out   kim.hsl.coroutine     I  从通道中获取数据 3
22:43:12.102 System.out   kim.hsl.coroutine     I  向通道中发送数据 3

以上是关于Kotlin 协程Channel 通道 ③ ( CoroutineScope#produce 构造生产者协程 | CoroutineScope#actor 构造消费者协程 )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Channel 通道 ③ ( CoroutineScope#produce 构造生产者协程 | CoroutineScope#actor 构造消费者协程 )

Kotlin 协程Channel 通道 ② ( Channel 通道容量 | Channel 通道迭代 | 使用 iterator 迭代器进行迭代 | 使用 for in 循环进行迭代 )

Kotlin 协程Channel 通道 ② ( Channel 通道容量 | Channel 通道迭代 | 使用 iterator 迭代器进行迭代 | 使用 for in 循环进行迭代 )

Kotlin 协程Channel 通道 ① ( Channel#send 发送数据 | Channel#receive 接收数据 )

Kotlin 协程Channel 通道 ① ( Channel#send 发送数据 | Channel#receive 接收数据 )

Kotlin 协程Channel 通道 ④ ( Channel 通道的热数据流属性 | Channel 通道关闭过程 | Channel 通道关闭代码示例 )