为啥我不能在 Kotlin Flow 中使用像 rxJava.Single.create 这样的发射函数?

Posted

技术标签:

【中文标题】为啥我不能在 Kotlin Flow 中使用像 rxJava.Single.create 这样的发射函数?【英文标题】:Why I can't use an emit function in Kotlin Flow like rxJava.Single.create?为什么我不能在 Kotlin Flow 中使用像 rxJava.Single.create 这样的发射函数? 【发布时间】:2020-03-18 07:02:54 【问题描述】:

我正在尝试将带有 rxjava 链的交互器重写为 kotlin 流。在 LocationHandlerImpl 中,我使用 LocationService 获取当前位置。在 addOnSuccessListener 和 addOnFailureListener 我正在发射我的模型但有错误:

“只能在协程体内调用暂停函数”。我做错了吗?但我可以在侦听器之外调用 emit(请看下面的 flow builder)

【问题讨论】:

【参考方案1】:

您似乎正在尝试从 android 定位服务获取最后一个位置。这是 Google Play 服务中许多Task-returning 调用之一。 Kotlin 已经有一个模块,kotlinx-coroutines-play-services,它提供了一个函数

suspend fun <T> Task<T>.await(): T?

在你的项目中,你可以简单地这样写:

suspend fun getMyLocation(): Location? =
        LocationServices.getFusedLocationProvider(context)
                .lastLocation
                .await()

如果您想将其与其他基于Flow 的代码集成,请添加此包装函数:

fun <T> Task<T>.asFlow() = flow  emit(await()) 

现在你可以写了

fun getLocationAsFlow(): Flow<Location?> =
        LocationServices.getFusedLocationProvider(context)
                .lastLocation
                .asFlow()

如果出于教育目的,您想看看如何在不使用附加模块的情况下直接实现它,那么最直接的方法如下:

fun getLocationAsFlow() = flow 
    val location = suspendCancellableCoroutine<Location?>  cont ->
        LocationServices.getFusedLocationProvider(context)
                .lastLocation
                .addOnCompleteListener 
                    val e = exception
                    when 
                        e != null -> cont.resumeWithException(e)
                        isCanceled -> cont.cancel()
                        else -> cont.resume(result)
                    
                
    
    emit(location)

这是将Task.await() 的简化实现内联到其使用站点的结果。

【讨论】:

为什么一定要suspend函数? 对于上下文,您指的是getLocationAsFlow(),它不必是可暂停的。已删除。【参考方案2】:

正如 Prokash (here) 所述,流被设计为自包含的。您的位置服务侦听器不在 Flow 范围内。

不过,您可以查看callbackFlow,它为您提供了使用基于回调的 API 构建流程所需的机制。

Callback Flow Documentation,请注意回调流程仍处于实验阶段。

【讨论】:

它对我有用。错误消失了,我不得不添加关于实验api的注释 @IliyaMashin 回调流对你有用吗?如果是,您可以标记正确答案,以便将您的问题标记为已解决。 它没有。编译错误消失了,但我在运行时遇到了新的异常 ClosedSendChannelException。当我打电话给offer(T) 你能发布一些关于你的代码当前状态的实际代码sn-ps吗?这对我们有很大帮助。 对不起。这是我的错。我只是忘记了“awaitClose”。它可以工作,但我无法进入 onCompletion 块

以上是关于为啥我不能在 Kotlin Flow 中使用像 rxJava.Single.create 这样的发射函数?的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?

为啥我们不能在 Kotlin 的 Singleton 类(对象)中使用受保护的访问修饰符

我如何在 swift Kotlin 多平台上使用 Flow?

打造一个 Kotlin Flow 版的 EventBus

Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )

Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )