如何将 kotlin 流转换为可变流?

Posted

技术标签:

【中文标题】如何将 kotlin 流转换为可变流?【英文标题】:How can you convert a kotlin flow to a mutable flow? 【发布时间】:2021-12-22 23:55:27 【问题描述】:

我试图在我的班级中保持一个可变的状态流,但是当我对其应用任何方法时,它将被转换为不可变的Flow<T>

class MyClass : Listener<String> 

       private val source = Source()

       val flow: Flow<String?>
          get() = _flow

       // region listener
        override fun onUpdate(value: String?) 
            if (value!= null) 
                // emit object changes to the flow
               // not possible, because the builder operators on the flow below convert it to a `Flow` and it doesn't stay as a `MutableSharedFlow` :(
                _flow.tryEmit(value) 
            
        
        // end-region

        @OptIn(ExperimentalCoroutinesApi::class)
        private val _flow by lazy 
            MutableStateFlow<String?>(null).onStart 
                emitAll(
                    flow<String?> 
                        val initialValue = source.getInitialValue()
                        emit(initialValue)
                    .flowOn(MyDispatchers.background)
                )
            .onCompletion  error ->
                // when the flow is cancelled, stop listening to changes
                if (error is CancellationException) 
                    // is was cancelled
                    source.removeListener(this@MyClass)
                
            .apply 
                // listen to changes and send them to the flow
                source.addListener(this@MyClass)
            
        

即使在我应用onCompletion/onStart 方法后,有没有办法将流保持为MutableStateFlow

【问题讨论】:

您希望它如何工作?对流应用转换后,生成的流使用第一个流作为其源。那么,生成的流如何从另一个流复制数据,同时又是直接可变的呢?或者,也许您只是想应用一些转换,但返回源流? 我想要一个可以在我的班级中发出的流,监听更改并将这些更改从这个班级中发出到流中,并且还监听流何时被取消,这样我就可以停止收听变化。好像没有办法,有的话可以给我看看吗? source.getInitialValue() 暂停了吗? 【参考方案1】:

如果您将转换应用于可变状态流,则生成的流将变为只读流,因为原始流充当其源。如果要手动发出事件,则需要将它们发出到初始源流。

话虽如此,您想要在这里实现的目标似乎很简单:将基于回调的 API 桥接到 Flow API。 Kotlin 协程中有一个内置函数可以做到这一点,称为callbackFlow。

我不确定您的源 API 如何处理背压,但它看起来像这样:

@OptIn(ExperimentalCoroutinesApi::class)
fun Source.asFlow(): Flow<String?> = callbackFlow 
    send(getInitialValue())

    val listener = object : Listener<String> 
        override fun onUpdate(value: String?) 
            if (value != null) 
                trySend(value)
            
        
    
    addListener(listener)
    awaitClose 
        removeListener(listener)
    

或者可能使用runBlocking send(value) 而不是trySend(),这取决于Source 如何在自己的线程池中处理背压和阻塞。

请注意,flowOn 可能会在此流程之上使用,但它只对getInitialValue() 很重要,因为执行回调的线程无论如何都由Source 控制。

如果为Source 添加许多侦听器的成本很高,您还可以考虑使用shareIn() 运算符共享此流,以便多个订阅者共享同一个侦听器订阅。

【讨论】:

以上是关于如何将 kotlin 流转换为可变流?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Java 源文件的一部分转换为 Kotlin?

如何将此 Java 代码转换为 kotlin 代码

如何将 C# 代码转换为 Kotlin 由 Json 库组成

如何将改造回调响应中的回调从 java 转换为 Kotlin

对比Java学Kotlin协程-异步流

对比Java学Kotlin协程-异步流