深潜Kotlin协程:协程作用域函数

Posted RikkaTheWorld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深潜Kotlin协程:协程作用域函数相关的知识,希望对你有一定的参考价值。

系列电子书:传送门


想象一下,在挂起函数中,你需要同时从两个(或者多个)源头获取数据,在了解如何正确的处理之前,我们先来看看一些次优的方法。

引入协程作用域函数前使用的方法

第一种方法是从挂起函数中调用挂起函数,这个方案问题点是它不是并发的(如果从每一个源头获取数据需要1秒,那么这个函数将需要2秒而不是1秒)。

// 数据是线性准备好的,而非同时准备好
suspend fun getUserProfile(): UserProfileData 
    val user = getUserData() // (1 sec)
    val notifications = getNotifications() // (1 sec)
    
    return UserProfileData(
        user = user,
        notifications = notifications,
    )

要并发地执行两个挂起,最简单的方法是使用 async 包装它们,然而, async 需要一个作用域,使用 GlobalScope 并不是一个好主意。

// 不要这么做
suspend fun getUserProfile(): UserProfileData 
    val user = GlobalScope.async  getUserData() 
    val notifications = GlobalScope.async  getNotifications() 
    return UserProfileData(
        user = user.await(), // (1 sec)
        notifications = notifications.await(),
    )

GlobalScope 只是一个带有 EmptyCoroutineContext 的作用域。

public object GlobalScope : CoroutineScope 
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext

如果在 GlobalScope 上调用 async,则与父协程没有联系,这意味着协程:

  • 不能被取消(如果父协程被取消, async 内部仍在运行,因此在它们会造成浪费资源)
  • 不从任何父节点上继承作用域(它将始终运行在默认的调度器上,不会遵守任何来自父协程的上下文)

重要的结论是:

  • 潜在的内存泄漏和 CPU 冗余的使用
  • 用于单元测试协程的工具在这里不起作用,所以测试这个函数是非常困难的

这不是一个很好的解决方案,让我们看看另外一个例子,我们将作用域做为参数传递:

// 不要这么做
suspend fun getUserProfile(
    scope: CoroutineScope
): UserProfileData 
    val user = scope.async  getUserData() 
    val notifications = scope.async  getNotifications() 
    return UserProfileData(
        user = user.await(), // (1 sec)
        notifications = notifications.await(),
    )


// 或者

// 不要这么做
suspend fun CoroutineScope.getUserProfile(): UserProfileData 
    val user = async  getUserData() 
    val notifications = async  getNotifications() 
    return UserProfileData(
        user = user.await(), // (1 sec)
        notifications = notifications.await(),
    )

这一个稍微好一些,因为现在可以取消协程,和适当的进行单元测试。问题是这个解决方案要求将作用域从一个函数传递给另一个函数。此外,这样的函数在作用域中可能会造成不必要的副作用,例如,如果在 async 中有一个异常,整个作用域将会被关闭(假设它使用的是 Job 而非 SuipervisorJob)。而且,能够访问作用域的函数很容易在外部滥用这种访问,例如,使用 cancel 方法取消这个作用域,这就是为什么这种方法可能是棘手的和有潜在危险的。

data class Details(val name: String, val followers: Int)
data class Tweet(val text: String)

fun getFollowersNumber(): Int = throw Error("Service exception")

suspend fun getUserName(): String 
    delay(500)
    return "marcinmoskala"


suspend fun getTweets(): List<Tweet> 
    return listOf(Tweet("Hello, world"))


suspend fun CoroutineScope.getUserDetails(): Details 
    val userName = async  getUserName() 
    val followersNumber = async  getFollowersNumber() 
    return Details(userName.await(), followersNumber.await())


fun main() = runBlocking 
    val details = try 
        getUserDetails()
     catch (e: Error) 
        null
    
    val tweets = async  getTweets() 
    println("User: $details")
    println("Tweets: $tweets.await()")

// Only Exception...

在上面的代码中,即使我们在拉取 userDetails 时出现异常,也希望至少看到 tweets 会被打印。但不幸的是,getFollowersNumber 上的一个异常中断了 async,它会中断整个作用域(包括 getTweets() )并结束程序。与之相对的,我们更希望函数在发生异常时仅仅是抛出异常而非中止。是时候介绍我们的英雄: coroutineScope 了。

coroutineScope

coroutineScope 是一个用于启动作用域的挂起函数。它返回由入参函数产生的值。

suspend fun <R> coroutineScope(
    block: suspend CoroutineScope.() -> R
): R

async 或者 launch 不同, coroutineScope 的函数体就是就地调用的。它会正式的创建一个新的协程,而且它会挂起前一个协程,直到新协程完成。因此它不会启动任何并发任务。看下下面的示例,其中两个 delay 都挂起了 runBlocking

fun main() = runBlocking(CoroutineName("Main")) 
    print("context $coroutineContext job:$coroutineContext[Job]\\n")
    val a = coroutineScope 
        print("context1: $coroutineContext job:$coroutineContext[Job]\\n")
        delay(1000)
        10
    
    println("a is calculated")
    val b = coroutineScope 
        print("context2: $coroutineContext job:$coroutineContext[Job]")
        delay(1000)
        20
    
    println(a) // 10
    println(b) // 20


//context [CoroutineName(Main), BlockingCoroutineActive@5ef04b5, BlockingEventLoop@5f4da5c3] job:BlockingCoroutineActive@5ef04b5
// context1: [CoroutineName(Main), ScopeCoroutineActive@108c4c35, BlockingEventLoop@5f4da5c3] job:ScopeCoroutineActive@108c4c35
// (1 sec)
// a is calculated
// context2: [CoroutineName(Main), ScopeCoroutineActive@2957fcb0, BlockingEventLoop@5f4da5c3] job:ScopeCoroutineActive@2957fcb010
// (1 sec)
// 10
// 20

coroutineScope 提供的是继承了外部作用域的 coroutineContext 的上下文作用域,但它覆盖了来自父节点上下文的 Job。因此,coroutineScope 将和其父节点建立下面这些规则:

  • 从父节点那里继承上下文
  • 在完成自己之前等待所有的子节点
  • 当父节点被取消时,所有子节点也会被取消

下面的示例中,你可以观察到 “After” 将在末尾打印出来,因为 coroutineScope 只有等待它所有子协程完成之后才会完成自己。此外, CoroutineName 也可以正常的从父节点传递给子节点:

suspend fun longTask() = coroutineScope 
    launch 
        delay(1000)
        val name = coroutineContext[CoroutineName]?.name
        println("[$name] Finished task 1")
    
    launch 
        delay(2000)
        val name = coroutineContext[CoroutineName]?.name
        println("[$name] Finished task 2")
    


fun main() = runBlocking(CoroutineName("Parent")) 
    println("Before")
    longTask()
    println("After")


// Before
// (1 sec)
// [Parent] Finished task 1
// (1 sec)
// [Parent] Finished task 2
// After

在下一个代码段中,你可以看到取消是如何工作的,父节点被取消后,未完成的子节点也会被取消。

suspend fun longTask() = coroutineScope 
    launch 
        delay(1000)
        val name = coroutineContext[CoroutineName]?.name
        println("[$name] Finished task 1")
    
    launch 
        delay(2000)
        val name = coroutineContext[CoroutineName]?.name
        println("[$name] Finished task 2")
    


fun main(): Unit = runBlocking 
    // job 是 coroutineScope 的父节点
    val job = launch(CoroutineName("Parent")) 
        longTask()
    
    delay(1500)
    job.cancel()

// [Parent] Finished task 1

与协程构建器不同,如果在 coroutineScope 或它的任意子协程中有一个异常,它会取消所有其他的子协程并重新抛出它。这就是为什么使用 coroutineScope 可以修复我们之前的 “Twitter”示例。为了展示同样的异常被重新抛出,我将泛型 Error 更改为一个具体的 ApiException

data class Details(val name: String, val followers: Int)
data class Tweet(val text: String)

class ApiException(
    val code: Int,
    message: String
) : Throwable(message)

fun getFollowersNumber(): Int = throw ApiException(500, "Service unavailable")

suspend fun getUserName(): String 
    delay(500)
    return "marcinmoskala"


suspend fun getTweets(): List<Tweet> 
    return listOf(Tweet("Hello, world"))


suspend fun getUserDetails(): Details = coroutineScope 
    val userName = async  getUserName() 
    val followersNumber = async  getFollowersNumber() 
    Details(userName.await(), followersNumber.await())


fun main() = runBlocking<Unit> 
    val details = try 
        getUserDetails()
     catch (e: ApiException) 
        null
    
    val tweets = async  getTweets() 
    println("User: $details")
    println("Tweets: $tweets.await()")

// User: null
// Tweets: [Tweet(text=Hello, world)]

这使得当我们只需要在一个挂起函数中启动多个并发的调用时, coroutineScope 在大多数情况下都是一个完美的候选。

suspend fun getUserProfile(): UserProfileData =
    coroutineScope 
        val user = async  getUserData() 
        val notifications = async  getNotifications() 
        UserProfileData(
            user = user.await(),
            notifications = notifications.await(),
        )
    

正如我们已经提到的, coroutineScope 现在经常被用来包装一个挂起函数的主体。你可以把它看出是 runBlocking 函数的现成替代品。

suspend fun main(): Unit = coroutineScope 
    launch 
        delay(1000)
        println("World")
    
    println("Hello, ")

// Hello
// (1 sec)
// World

函数 coroutineScope 在挂起上下文之外创建了一个额外的作用域。它从父节点继承作用域,并支持结构化并发性。
为了说明这一点,下面的两个函数实际上没有区别,除了第一个函数是依次调用 getProfilegetFriends,而第二个函数是同时调用它们。

suspend fun produceCurrentUserSeq(): User 
    val profile = repo.getProfile()
    val friends = repo.getFriends()
    return User(profile, friends)

suspend fun produceCurrentUserSym(): User = coroutineScope 
    val profile = async  repo.getProfile() 
    val friends = async  repo.getFriends() 
    User(profile.await(), friends.await())

coroutineScope 是一个有用的协程作用域函数,但它不是唯一的。

协程作用域函数

还有更多创建作用域的函数,它们的行为与 coroutineScope 类似, supervisorScope 类似于 coroutineScope,但它使用的是 SupervisorJob 而不是 JobwithContext 是一个可以修改协程上下文的 coroutineScopewithTimeout 是一个带有超时设置的 coroutineScope。这些函数将在本章的以下部分更好的解释。现在,我只想让你们知道这这组函数如此的类似,那么它们应该有一个名字,我们该如何命名这个函数组呢?有些人称之为“scoping fuction”,但我觉得这会令人感到困惑,因为我不确定“scoping”是什么意思。我想命名的人只是想它不同于 “scope function”(作用域函数,如 letwithapply 等),这并没有真正帮助,因为这两个术语经常被混淆。这就是为什么我决定使用“Coroutine scope fuction”(协程作用域函数),它名字更长但是应该会引起更少的误解,我后面发现它更正确,请你想一想:协程作用域函数是那些用于在挂起函数中创建协程作用域的函数。

另一方面,协程作用域函数经常与协程构建器混淆,这是不正确的,因为它们在概念和实践上都非常不同。为了证明这一点,下表给出了它们之间的比较:

协程构建器(除了 runBlocking协程作用域函数
launchasyncproducecoroutineScopesupervisorScopewithContextwithTimeout
都是 CoroutineScope 的扩展函数都是挂起函数
携带来自 CoroutineScope 的协程上下文携带挂起函数 continuation 的协程上下文
异常通过 Job 传递给父协程异常的抛出方式与常规函数抛出的异常的方式相同
启动一个异步协程(可能不会马上执行)就地启动一个协程

再来想想 runBlocking,你可能会注意到,它看起来和协程作用域函数有一些共同点,而不像构建器那样。 runBlocking 也会就地调用它的主体并返回结果。最大的区别是 runBlocking 必须位于协程层次结构的根部,而协程作用域函数必须位于中间位置。

withContext

withContext 函数类似于 coroutineScope,但它还允许对协程作用域进行一些更改。作为参数提供给这个函数的上下文将会覆盖来自父作用域的上下文(与协程构建器中的方法相同)。这意味着 withContext(EmptyCooutineContext)coroutineScope 的行为完全相同。

fun CoroutineScope.log(text: String) 
    val name = this.coroutineContext[CoroutineName]?.name
    println("[$name] $text")


fun main() = runBlocking(CoroutineName("Parent")) 
    log("Before")
    
    withContext(CoroutineName("Child 1")) 
        delay(1000)
        log("Hello 1")
    
    withContext(CoroutineName("Child 2")) 
        delay(1000)
        log("Hello 2")
    

    log("After")

// [Parent] Before
// (1 sec)
// [Child 1] Hello 1
// (1 sec)
// [Child 2] Hello 2
// [Parent] After

函数 withContext 通常用于为部分代码设置不同的协程作用域。通常,你应该将它与 dispatchers 一起使用,这将在下一章节中描述。

launch(Dispatchers.Main) 
    view.showProgressBar()
    withContext(Dispatchers.IO) 
        fileRepository.saveData(data)
    
    view.hideProgressBar()

supervisorScope

supervisorScope 函数的行为也很像 coroutineScope:它创建了一个 CoroutineScope,从外部作用域继承而来,并在其中调用指定的挂起块。不同的是,它用 SupervisorJob 重写了来自上下文的 Job,所以当子协程发生异常时,它不会被取消。

fun main() = runBlocking 
    println("Before")
    supervisorScope 
        launch 
            delay(1000)
            throw Error()
        
        launch 
            delay(2000)
            println("Done")
        
    
    println("After")

// Before
// (1 sec)
// Exception...
// (1 sec)
// Done
// After

supervisScope 主要用于启动多个独立任务的函数。

suspend fun notifyAnalytics(actions: List<UserAction>) =
    supervisorScope 
        actions.forEach  action ->
            launch 
                notifyAnalytics(action)
            
        
    

如果使用 async,将其异常传播到父级是不够的。当调用 await 时,async 协程将以一个异常结束,然后 await 将重新抛出该异常。这就是为什么如果我们真的想忽略异常,我们还应该用 try-catch 块来包装 await 调用。

class ArticlesRepositoryComposite(
    private val articleRepositories: List<ArticleRepository>
) : ArticleRepository 
    override suspend fun fetchArticles(): List<Article> = supervisorScope 
        articleRepositories
        .map  async  it.fetchArticles()  
        .mapNotNull 
            try 
                it.await()
             catch (e: Throwable) 
                e.printStackTrace()
                null
            
        
        .flatten()
        .sortedByDescending  it.publishedAt 
    

在我的知识星球中,我经常被问到是否可以使用 withContext(SuperviseJob()) 来代替 supervisorScope。答案是不,我们不能这样做。当我们使用 withContext(SupervisorJob()) 时,withContext 仍然使用一个常规的 Job,只是 SupervisorJob 是它的父级 job。因此当一个子协程发生异常时,其它的子协程也会被取消。withContext 也会抛出异常,因此它的 SuperviseJob 是无用的。这就是为什么我发现 withContext(SuperviseJob()) 毫无意义且具有误导性,我认为这是一个糟糕的实践。

fun main() = runBlocking 
    println("Before")
    withContext(SupervisorJob()) 
        launch 
            delay(1000)
            thro

以上是关于深潜Kotlin协程:协程作用域函数的主要内容,如果未能解决你的问题,请参考以下文章

深潜Kotlin协程:协程作用域函数

深潜Kotlin协程:协程作用域函数

深潜Kotlin协程(二十):构建 Flow

深潜Kotlin协程(二十):构建 Flow

深潜Koltin协程:协程的取消

深潜Kotlin协程(二十二):Flow的处理