面试题 | 等待多个并发结果有哪几种方法?

Posted datian1234

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了面试题 | 等待多个并发结果有哪几种方法?相关的知识,希望对你有一定的参考价值。

引子

App 开发中,等待多个异步结果的场景很多见,

比如并发地在后台执行若干个运算,待所有运算执行完毕后归总结果。

比如并发地请求若干个接口,待所有结果返回后刷新界面。

比如统计相册页并发加载 20 张图片的耗时。

其实把若干异步任务串行化是最简单的解决办法,即前一个异步任务执行完毕后再执行下一个。但这样就无法利用多核性能,执行时间被拉长,此时的执行总时长 = 所有任务执行时长的和。

若允许任务并发,则执行总时长 = 执行时间最长任务的耗时。时间性能得以优化,但随之而来的一个复杂度是:“如何等待多个异步结果”。

本文会介绍几种解决方案,并将它们运用到不同的业务场景,比对一下哪个方案适用于哪个场景。

等待并发网络请求

布尔值

假设有如下两个网络请求:

// 拉取新闻
fun fetchNews() 
    newsApi.fetchNews().enqueue(object : Callback<List<News>> 
        override fun onFailure(call: Call<List<News>>, t: Throwable)  ... 
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>)  ... 
    )

// 拉取广告
fun fetchAd() 
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> 
        override fun onFailure(call: Call<List<Ad>>, t: Throwable)  ... 
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>)  ... 
    )

广告需要按一定规则插入到新闻列表中。

最简单的做法是,先请求新闻,待其返回后再请求广告。显然这会增加用户等待时间。而且会写出这样的代码:

// 拉取新闻
fun fetchNews() 
    newsApi.fetchNews().enqueue(object : Callback<News> 
        override fun onFailure(call: Call<News>, t: Throwable)  ... 
        override fun onResponse(call: Call<News>, response: Response<News>) 
                // 拉取广告
                newsApi.fetchAd().enqueue(object : Callback<Ad> 
                    override fun onFailure(call: Call<Ad>, t: Throwable)  ... 
                    override fun onResponse(call: Call<Ad>, response: Response<Ad>)  ... 
                )
        
    )

嵌套回调,若再加一个接口,回调层次就会再加一层,不能忍。 用户和程序员的体验都不好,得想办法解决。

第一个想到的方案是布尔值:

var isNewsDone = false
var isAdDone = false
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() 
    newsApi.fetchNews().enqueue(object : Callback<List<News>> 
        override fun onFailure(call: Call<List<News>>, t: Throwable)  
            isNewsDone = true
            tryRefresh(news, ad)
        
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>)  
            isNewsDone = true
            news = response.body().result
            tryRefresh(news, ad)
        
    )

// 拉取广告
fun fetchAd() 
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> 
        override fun onFailure(call: Call<List<Ad>>, t: Throwable)  
            isAdDone = true
            tryRefresh(news, ad)
        
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>)  
            isAdDone = true
            ads = response.body().result
            tryRefresh(news, ad)
        
    )

// 尝试刷新界面(只有当两个请求都返回时才刷新)
fun tryRefresh(news: List<News>, ads: List<Ad>) 
    if(isNewsDone && isAdDone) //刷新界面 

设置两个布尔值分别对应两个请求是否返回,并且在每个请求返回时检测两个布尔值,若都为 true 则进行刷新界面。

网络库通常会将请求成功的回调抛到主线程执行,所以这里没有线程安全问题。但如果不是网络请求,而是后台任务,此时需要将布尔值声明为volatile保证其可见性,关于 volatile 更详细的解释可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?

这个方案能解决问题,但只适用于并发请求数量很少的请求,因为每个请求都要声明一个布尔值。而且每增加一个请求都要修改其余请求的代码,可维护性差。

CountdownLatch

更好的方案是CountDownLatch,它是java.util.concurrent包下的一个类,用来等待多个异步结果,用法如下:

val countdownLatch = CountDownLatch(2)//初始化,等待2个异步结果
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() 
    newsApi.fetchNews().enqueue(object : Callback<List<News>> 
        override fun onFailure(call: Call<List<News>>, t: Throwable)  
            countdownLatch.countDown()
        
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>)  
            news = response.body().result
            countdownLatch.countDown()
        
    )

// 拉取广告
fun fetchAd() 
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> 
        override fun onFailure(call: Call<List<Ad>>, t: Throwable)  
            countdownLatch.countDown()
        
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>)  
            ads = response.body().result
            countdownLatch.countDown()
        
    )

// countdownLatch 在新线程中等待
thread  
    countdownLatch.await() // 阻塞线程等待两个请求返回
    liveData.postValue() // 抛数据到主线程刷刷新界面
.start()

CountDownLatch 在构造时需传入一个数量,它的语义可以理解为一个计数器。countDown() 将计数器减一,而 await() 会阻塞当前线程直到计数器为 0 才被唤醒。

该计数器是一个 int 值,可能被多线程访问,为了保证线程安全,它被声明为 volatile,并且 countDown() 通过 CAS + 自旋的方式将其减一。

关于 CAS 的介绍可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?

若新增一个接口,只需要将计数器的值加一,并在新接口返回时调用 countDown() 即可,可维护性陡增。

协程

Kotlin 是降低复杂度的大师,它对于这个问题的解决方案可以让代码看上去更简单。

在 Kotlin 的世界里异步操作应该被定义为suspend方法,retrofit 就支持这样的操作,比如:

interface NewsApi 
    @GET("/xxx")
    suspend fun fetchNews(): List<News>
    @GET("/xxx")
    suspend fun fetchAd(): List<Ad>

然后在协程中使用async启动异步任务:

scope.launch 
    // 并发地请求网络
    val newsDefered = async  fetchNews() 
    val adDefered = async  fetchAd() 
    // 等待两个网络请求返回
    val news = newsDefered.await()
    val ads = adDefered.await()
    // 刷新界面
    refreshUi(news, ads)

不管是写起来还是读起来,体验都非常好。因为协程把回调干掉了,逻辑不会跳来跳去。

其中的async()是 CoroutineScope 的扩展方法:

// 启动协程,并返回协程执行结果
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> 
    ...

async() 和 launch() 唯一的不同是它的返回值是Defered,用于描述协程体执行的结果:

public interface Deferred<out T> : Job 
    // 挂起方法: 等待值的计算,但不会阻塞当前线程,计算完成后恢复当前协程执行
    public suspend fun await(): T

调用async()启动子协程不会挂起外层协程,而是立即返回一个Deferred对象,直到调用Deferred.await()协程的执行才会被挂起。当协程在多个Deferred对象上被挂起时,只有当它们都恢复后,协程才继续执行。这样就实现了“等待多个并行的异步结果”。

但这样写会问题:当广告拉取抛出异常时,新闻拉取也会被取消。

这是协程的一个默认设定,叫结构化并发,即并发是有结构性的。

Java 中线程的并发是没有结构的,所以做如下事情很困难:

  1. 结束一个线程时,如何一并结束它所有的子线程?
  2. 当某个子线程抛出异常时,如何结束和它同一层级的兄弟线程?
  3. 父线程如何等待所有子线程结束之后才结束?

之所以会很困难,是因为 Java 中的线程是没有级联关系的。而 Kotlin 通过协程域 CoroutineScope 以及协程上下文 CoroutineContext 实现级联关系。

在协程中启动的子协程会继承父协程的协程上下文,除了其中的 Job,一个新的 Job 会被创建并归属于父协程的子 Job。通过这套机制,协程和子协程之间有了级联关系,就能实现结构化并发。(以后会就结构化并发写一个系列,敬请期待~)

关于 CoroutineContext 内部结构的详细剖析可以点击Kotlin 协程 | CoroutineContext 为什么要设计成 indexed set?

但有些业务场景不需要子任务之间相互关联,比如当前场景,广告加载失败不应该影响新闻的拉取,大不了不展示广告。为此 kotlin 提供了supervisorScope

scope.launch 
    supervisorScope 
        // 并发地请求网络
        val newsDefered = async  fetchNews() 
        val adDefered = async  fetchAd() 
        // 等待两个网络请求返回
        val news = newsDefered.await()
        val ads = adDefered.await()
        // 刷新界面
        refreshUi(news, ads)
    

supervisorScope 新建一个协程域继承父亲的协程上下文,但会将其中的 Job 重写为SupervisorJob,它的特点就是孩子的失败不会影响父亲,也不会影响兄弟。

现在广告和新闻加载互不影响,各自抛异常都不会影响对方。但就目前的业务场景来说,理想情况是这样的:“广告加载失败不应该影响新闻的加载。但新闻加载失败应该取消广告的加载(因为此时广告也没有展示的机会)”

稍改动下代码:

scope.launch 
    supervisorScope 
        // 并发地请求网络
        val adDefered = async  fetchAd() 
        val newsDefered = async  fetchNews() 
        // 当新闻请求抛异常时,取消广告请求
        newsDefered.invokeOnCompletion  throwable ->
            throwable?.let  adDefered.cancel() 
        
        // 等待新闻
        val news = try 
            newsDefered.await()
         catch (e: Exception) 
            emptyList()
        
        // 等待广告
        val ads = try 
            adDefered.await()
         catch (e: Exception) 
            emptyList()
        
        // 刷新界面
        refreshUi(news, ads)
    

invokeOnCompletion()相当于注册了一个回调,在异步任务结束时调用,不管是正常结束还是因异常而结束。在该回调中判断,若新闻因异常而结束则取消广告任务。

因为新闻和广告任务都可能抛出异常,且 async 启动的异步任务是在调用 await() 时才会抛出异常,所以它应该包裹在 try-catch 中。Kotlin 中的 try-catch 是一个表达式,即是有返回值的。这个特性让正常和异常情况的值聚合在一个表达式中。

若不使用 try-catch,程序也不会奔溃,因为 supervisorScope 中异常是不会向上传播的,即子协程的异常不会影响兄弟和父亲。但这样就少了异常情况的处理。

若现有代码都是 Callback 形式的,还能不能享受协程的简洁?

能!Kotlin 提供了suspendCoroutine(),专门用于将回调风格的代码转换成 suspend 方法,以拉取新闻为例:

// Callback 形式
fun fetchNews() 
    newsApi.fetchNews().enqueue(object : Callback<List<News>> 
        override fun onFailure(call: Call<List<News>>, t: Throwable)  ... 
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>)  ... 
    )


// suspend 形式
suspend fun fetchNews() = suspendCoroutine<List<News>>  continuation ->
    newsApi.fetchNews().enqueue(object : Callback<List<News>> 
        override fun onFailure(call: Call<List<News>>, t: Throwable)  
            continuation.resumeWithException(t)
        
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>)  
            continuation.resume(response.body().result)
        
    ) 

其中的Continuation剩余的计算,从形式上看,它就是一个回调:

public interface Continuation<in T> 
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>) // 开始剩余的计算

每个 suspend 方法被编译成 java 之后,都会在原有方法参数表最后添加一个 Continuation 参数,用于表达这个挂起点之后“剩余的计算”,举个例子:

scope.launch 
    fun1() // 普通方法
    suspendFun1() // 挂起方法 
    // --------------------------
    fun2() // 普通方法
    suspendFun2() // 挂起方法
    // --------------------------

整个协程体中有四个方法,其中两个是挂起方法,每个挂起方法都是一道水平的分割线,分割线下方的代码就是当前执行点相对于整个协程体剩余的计算,这“剩余的计算”会被包装成 Continuation 并作为参数传入挂起方法。所以上述代码翻译成 java 就类似于:

scope.launch 
    fun1()
    suspendFun1(new Continuation() 
        @override
        public void resumeWith(Result<T> result) 
            fun2()
            suspendFun2(new Continuation() 
                @override
                public void resumeWith(Result<T> result) 

                
            )
        
    )

所以挂起方法无异于 java 中带回调的方法,它自然不会阻塞当前线程,它只是把协程体中剩下的代码当成回调,该回调会在将来某个时间点被执行。通过这种方式,挂起方法主动让出了 cpu 执行权。

题外话

从业务上讲,将 Callback 方法改造成挂起式可以降低业务复杂度。举个例子:用户可以通过若干动作触发拉取新闻,比如首次进入新闻页、下拉刷新新闻页、上拉加载更多新闻、切换分区。新闻页有一个埋点,当首次展示某分区时,上报此时的新闻。

若没有 suspend 方法,代码应该这样写:

// NewsViewModel.kt
fun fetchNews(isFirstLoad: Boolean, isChangeType: Boolean) 
    newsApi.fetchNews().enqueue(object : Callback<List<News>> 
        override fun onFailure(call: Call<List<News>>, t: Throwable)  ... 
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>)  
            // 将新闻抛给界面刷新
            newsLiveData.value = response.body.result
            // 只有当首次加载或切换分区时时才埋点
            if(isFirstLoad || isChangeType) 
                reportNews(response.body.result)
            
        
    )

// NewsActivity.kt
// 分区切换监听
tab.setOnTabChangeListener  index ->
    newsViewModel.fetchNews(false, true)

// 首次加载新闻
fun init() 
    newsViewModel.fetchNews(true, false)

// 下拉刷新
refreshLayout.setOnRefreshListener 
    newsViewModel.fetchNews(false, false)

// 上拉加载更多
refreshLayout.setOnLoadMoreListener 
    newsViewModel.fetchNews(false, false)

因为埋点需要带上新闻列表,所以必须在请求返回之后上报。不同业务场景的拉取接口是同一个,所以只能在统一的 onResponse() 中分类讨论,分类讨论依赖于标记位,不得不为 fetchNews() 添加两个参数。

如果将拉取新闻的接口改成 suspend 方式就能化解这类复杂度:

// NewsViewModel.kt
suspend fun fetchNews() = suspendCoroutine<List<News>>  continuation ->
    newsApi.fetchNews().enqueue(object : Callback<List<News>> 
        override fun onFailure(call: Call<List<News>>, t: Throwable)  
            continuation.resumeWithException(t)
        
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>)  
            val news = response.body.result
            newsLiveData.value = news
            continuation.resume(news)
        
    ) 


// NewsActivity.kt
fun initNews() 
    scope.launch 
        val news = viewModel.fetchNews()
        reportNews(news)
    


fun changeNewsType() 
    scope.launch 
        val news = viewModel.fetchNews()
        reportNews(news)
    


fun loadMoreNews() 
    scope.launch  viewModel.fetchNews() 


fun refreshNews() 
    scope.launch  viewModel.fetchNews() 


newsViewModel.newsLiveData.observe news ->
    showNews(news)

所有界面的刷新还是走 LiveData,但拉取新闻的方法被改造成挂起之后,也会将新闻列表用类似同步的方式返回,所以可以在相关业务点进行单独埋点。

统计相册加载图片耗时

再通过一个更高并发数的场景比对下各个方案代码上的差异

测试并发加载 20 张网络图片的总耗时。该场景下已经无法使用布尔值,因为并发数太多。

CountdownLatch

var start = SystemClock.elapsedRealtime()
var imageUrls = listOf(...)
val countdownLatch = CountDownLatch(imageUrls.size)
// 另起线程等待 CountDownLatch 并输出耗时
scope.launch(Dispatchers.IO) 
    countdownLatch.await()
    Log.d( "test", "time-consume=$SystemClock.elapsedRealtime() - start" )


// 遍历 20 张图片 url
imageUrls.forEach  img ->
        ImageView // 动态构建 ImageView
            layout_width = 100
            layout_height = 100
            Glide.with(this@GlideActivity)
                .load(img)
                .listener(object : RequestListener<Drawable> 
                    override fun onLoadFailed(
                        e: GlideException?,
                        model: Any?,
                        target: Target<Drawable>?,
                        isFirstResource: Boolean
                    ): Boolean 
                        countdownLatch.countDown() // 加载完一张
                        return false
                    

                    override fun onResourceReady(
                        resource: Drawable?,
                        model: Any?,
                        target: Target<Drawable>?,
                        dataSource: DataSource?,
                        isFirstResource: Boolean
                    ): Boolean 
                        countdownLatch.countDown() // 加载完一张
                        return false
                    
                )
               .into(this)
        

协程

var imageUrls = listOf(...)
scope.launch 
    val start = SystemClock.elapsedRealtime()
    // 将每个 url 都变换为一个 Defered
    val defers = imageUrls.map  img ->
            val imageView = ImageView 
                layout_width = 100
                layout_height = 100
            
            async  imageView.loadImage(img) 
    
    defers.awaitAll()//等待所有的异步任务结束
    Log.d( "test", "time-consume=$SystemClock.elapsedRealtime() - start" )

// 将 Callback 方式的加载转换为挂起方式
private suspend fun ImageView.loadImage(img: String) = suspendCoroutine<String>  continuation ->
    Glide.with(this@GlideActivity)
        .load(img)
        .listener(object : RequestListener<Drawable> 
            override fun onLoadFailed(
                e: GlideException?,
                model: Any?,
                target: Target<Drawable>?,
                isFirstResource: Boolean
            ): Boolean 
                continuation.resume("")
                return false
            

            override fun onResourceReady(
                resource: Drawable?,
                model: Any?,
                target: Target<Drawable>?,
                dataSource: DataSource?,
                isFirstResource: Boolean
            ): Boolean 
                continuation.resume("")
                return false
            
        )
        .into(this)

你更喜欢哪种方式?

参考

Multiple Concurrent Asynchronous calls using Kotlin coroutines (async-await and suspendCoroutine) | by Priya Sindkar Shah | MindOrks | Medium

作者:唐子玄
链接:https://juejin.cn/post/7121517604644061192

最后

如果想要成为架构师或想突破20~30K薪资范畴,那就不要局限在编码,业务,要会选型、扩展,提升编程思维。此外,良好的职业规划也很重要,学习的习惯很重要,但是最重要的还是要能持之以恒,任何不能坚持落实的计划都是空谈。

如果你没有方向,这里给大家分享一套由阿里高级架构师编写的《android八大模块进阶笔记》,帮大家将杂乱、零散、碎片化的知识进行体系化的整理,让大家系统而高效地掌握Android开发的各个知识点。

相对于我们平时看的碎片化内容,这份笔记的知识点更系统化,更容易理解和记忆,是严格按照知识体系编排的。

全套视频资料:

一、面试合集

二、源码解析合集


三、开源框架合集


欢迎大家一键三连支持,若需要文中资料,直接点击文末CSDN官方认证微信卡片免费领取↓↓↓

以上是关于面试题 | 等待多个并发结果有哪几种方法?的主要内容,如果未能解决你的问题,请参考以下文章

最全多线程经典面试题和答案

最全多线程经典面试题和答案

史上最全Java多线程面试题及答案

史上最全Java多线程面试60题,含答案大赠送!

史上最全Java多线程面试60题,含答案大赠送!

java算法面试题:排序都有哪几种方法?请列举。用JAVA实现一个快速排序。选择冒泡快速集合至少4种方法排序