Kotlin协程-并发处理-基础

Posted 且听真言

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程-并发处理-基础相关的知识,希望对你有一定的参考价值。

一、协程与并发

Kotlin协程是基于线程执行的。经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同。

在java的世界里,并发往往是多个线程一起工作,存在共享的变量。需要处理好同步问题。要避免把协程与线程的概念混淆。

runBlocking 
        var i = 0
        launch(Dispatchers.Default) 
            repeat(1000) 
                i++
            
        
        delay(1000L)
        println(i)
    

Log
1000

Process finished with exit code 0

上述代码中没有任何并发任务,launch创建了一个协程,所有的计算都发生在协程中。所以不需要考虑同步问题。

1.协程并发问题

多个协程并发执行的例子:

 

runBlocking 
        var i = 0
        val jobs = mutableListOf<Job>()

        repeat(10) 
            val job = launch(Dispatchers.Default) 
                repeat(1000) 
                    i++
                
            
            jobs.add(job)
        
        jobs.joinAll()
        println(i)
    

9933

Process finished with exit code 0

上述代码中,创建了10个协程任务,每个协程任务都会工作在Default线程池中,这10个协程任务对i进行1000次自增操作,但是因为10个协程分别运行在不同的线程之前,且共享一个变量,所以会产生同步问题。

 2.协程处理并发的手段

在Java中的同步手段有:synchronized、Atomic、Lock等;

2.1使用@Synchronized注解或者synchronized()代码块

 runBlocking 
        var i = 0
        val lock = Any()
        val jobs = mutableListOf<Job>()
        repeat(10) 
            val job = launch(Dispatchers.Default) 
                repeat(1000) 
                    synchronized(lock) 
                        i++
                    
                
            
            jobs.add(job)
        
        jobs.joinAll()
        println(i)

    

10000

Process finished with exit code 0

 

如何在上面的synchronized代码块中加入挂起函数,则发现会报错。

如下:

  runBlocking 
        suspend fun prepare() 

        

        var i = 0
        val lock = Any()
        val jobs = mutableListOf<Job>()
        repeat(10) 
            val job = launch(Dispatchers.Default) 
                repeat(1000) 
                    synchronized(lock) 
                        prepare()
                        i++
                    
                
            
            jobs.add(job)
        
        jobs.joinAll()
        println(i)
    

 所以可以发现不能在synchronized当中调用挂起函数,编译器会报错。因为挂起函数会被翻译成带有Continuation的异步函数,造成synchronized代码块无法同步处理。

2.2协程并发思路

  • 单线程并发

在Kotlin协程中可以实现单线程并发。

runBlocking 
        suspend fun getResult1(): String 
            printlnCoroutine("Start getResult1")
            delay(1000L)
            printlnCoroutine("End getResult1")
            return "Result1"
        

        suspend fun getResult2(): String 
            printlnCoroutine("Start getResult2")
            delay(1000L)
            printlnCoroutine("End getResult2")
            return "Result2"
        

        suspend fun getResult3(): String 
            printlnCoroutine("Start getResult3")
            delay(1000L)
            printlnCoroutine("End getResult3")
            return "Result3"
        

        val results = mutableListOf<String>()

        val time = measureTimeMillis 
            val result1 = async 
                getResult1()
            

            val result2 = async 
                getResult2()
            


            val result3 = async 
                getResult3()
            

            results.add(result1.await())
            results.add(result2.await())
            results.add(result3.await())
        

        println("Time:$time")
        println(results)

    


fun printlnCoroutine(any: Any?) 
    println("" + any + ";Thread:" + Thread.currentThread().name)



Log

Start getResult1;Thread:main @coroutine#2
Start getResult2;Thread:main @coroutine#3
Start getResult3;Thread:main @coroutine#4
End getResult1;Thread:main @coroutine#2
End getResult2;Thread:main @coroutine#3
End getResult3;Thread:main @coroutine#4
Time:1028
[Result1, Result2, Result3]

Process finished with exit code 0

上面代码启动了三个协程,它们之间是并发执行的,每个协程耗时1000ms,总耗时1000多毫秒,而且这几个协程都运行在main线程上。

所以 可以考虑将i++逻辑分发到单线程之上。

 runBlocking 
        val coroutineDispatcher = Executors.newSingleThreadExecutor 
            Thread(it, "MySingleThread").apply 
                isDaemon = true
            
        .asCoroutineDispatcher()

        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) 
            val job = launch(coroutineDispatcher) 
                repeat(1000) 
                        i++
                
            
            jobs.add(job)
        
        jobs.joinAll()
        println(i)
    

10000

Process finished with exit code 0

 上述代码把所有协程任务分发到单独的线程中执行,但这10个协程是并发执行的。

  • Mutex

在java中,Lock之类的同步锁是阻塞式的,而Kotlin提供了非阻塞式的锁:Mutex。

   runBlocking 
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()

        repeat(10) 
            val job = launch(Dispatchers.Default) 
                repeat(1000) 
                    mutex.lock()
                    i++
                    mutex.unlock()
                
            
            jobs.add(job)
        

        jobs.joinAll()
        println(i)
    

Log
10000

Process finished with exit code 0

 上述代码使用mutex.lock()、 mutex.unlock()包裹同步计算逻辑,实现多线程同步。Mutex 对比 JDK 当中的锁,最大的优势就在于支持挂起和恢复。

public interface Mutex 
  
  
    public val isLocked: Boolean

    
    public fun tryLock(owner: Any? = null): Boolean


    public suspend fun lock(owner: Any? = null)

   
    @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
        "For additional details please refer to #2794") // WARNING since 1.6.0
    public val onLock: SelectClause2<Any?, Mutex>


    public fun holdsLock(owner: Any): Boolean

  
    public fun unlock(owner: Any? = null)

Mutex 是一个接口,它的 lock() 方法其实是一个挂起函数。而这就是实现非阻塞式同步锁的根本原因。

但是上述代码中对于 Mutex 的使用其实是错误的,会存在问题。如果代码在 mutex.lock()、mutex.unlock() 之间发生异常,从而导致 mutex.unlock() 无法被调用。这个时候,整个程序的执行流程就会一直卡住,无法结束。看下面代码:

runBlocking 
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()

        repeat(10) 
            val job = launch(Dispatchers.Default) 
                repeat(1000) 
                    mutex.lock()
                    i++
                    i/0
                    mutex.unlock()
                
            
            jobs.add(job)
        

        jobs.joinAll()
        println(i)
    

 

如何解决?使用mutex.withLock。

代码入下:

 runBlocking 
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) 
            val job = launch(Dispatchers.Default) 
                repeat(1000) 
                    mutex.withLock 
                        i++
                    
                
            
            jobs.add(job)
        
        jobs.joinAll()

        println(i)
    

10000

Process finished with exit code 0

 

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T 
    contract  
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    

    lock(owner)
    try 
        return action()
     finally 
        unlock(owner)
    

withLock 的本质,其实是在 finally 当中调用了 unlock()。

  • Actor

Actor,它本质上是基于 Channel 管道消息实现的。

sealed class Msg
object AddMsg : Msg()

class ResultMsg(val result: CompletableDeferred<Int>) : Msg()

fun testCoroutinueConcurrent10() 
    runBlocking 
        suspend fun addActor() = actor<Msg> 
            var counter = 0
            for (msg in channel) 
                when (msg) 
                    is AddMsg -> counter++
                    is ResultMsg -> msg.result.complete(counter)
                
            
        

        val actor = addActor()
        val jobs = mutableListOf<Job>()

        repeat(10) 
            val job = launch(Dispatchers.Default) 
                repeat(1000) 
                    actor.send(AddMsg)
                
            
            jobs.add(job)
        
        jobs.joinAll()
        val deferred = CompletableDeferred<Int>()
        actor.send(ResultMsg(deferred))

        val result = deferred.await()
        actor.close()
        println(result)
    


Log
10000

Process finished with exit code 0

  addActor() 挂起函数,它其实调用了 actor() 这个高阶函数。而这个函数的返回值类型其实是 SendChannel。由此可见,Kotlin 当中的 Actor 其实就是 Channel 的简单封装。Actor 的多线程同步能力都源自于 Channel。这里,我们借助密封类定义了两种消息类型,AddMsg、ResultMsg,然后在 actor 内部,我们处理这两种消息类型,如果我们收到了 AddMsg,则计算“i++”;如果收到了 ResultMsg,则返回计算结果。而在 actor 的外部,我们则只需要发送 10000 次的 AddMsg 消息,最后再发送一次 ResultMsg,取回计算结果即可。Actor 本质上是基于 Channel 管道消息实现的。

  • 避免共享可变状态
 runBlocking 
        val deferreds = mutableListOf<Deferred<Int>>()
        repeat(10) 
            val deferred = async(Dispatchers.Default) 
                var i = 0
                repeat(1000) 
                    i++
                
                return@async i
            
            deferreds.add(deferred)
        
        var result = 0
        deferreds.forEach 
            result += it.await()
        

        println(result)
    

Log

10000

Process finished with exit code 0

在每一个协程当中,都有一个局部的变量 i,同时将 launch 都改为了 async,让每一个协程都可以返回计算结果。这种方式,相当于将 10000 次计算,平均分配给了 10 个协程,让它们各自计算 1000 次。这样一来,每个协程都可以进行独立的计算,然后我们将 10 个协程的结果汇总起来,最后累加在一起。

 runBlocking 
        val result = (1..10).map 
            async(Dispatchers.Default) 
                var i = 0
                repeat(1000) 
                    i++
                
                return@async i
            
        .awaitAll()
            .sum()
        println(result)
    

Log
10000

Process finished with exit code 0

以上是关于Kotlin协程-并发处理-基础的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程协程底层实现 ② ( 协程调度器 | 协程任务泄漏 | 结构化并发 )

发现不一样的Kotlin多方位处理协程的异常

发现不一样的Kotlin多方位处理协程的异常

Kotlin协程基础

Kotlin 协程基础Coroutine

Kotlin 协程基础Coroutine