有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?

Posted

技术标签:

【中文标题】有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?【英文标题】:Is there a way to achieve this rx flow in Kotlin with coroutines/Flow/Channels? 【发布时间】:2020-01-08 13:48:48 【问题描述】:

我是第一次尝试 Kotlin Coroutines 和 Flow,我正在尝试使用 MVI-ish 方法重现我在带有 RxJava 的 android 上使用的某个流程,但我很难正确处理,而且我基本上被卡住了此时。

RxJava 应用程序看起来基本上是这样的:

MainActivityView.kt

object MainActivityView 

    sealed class Event 
        object OnViewInitialised : Event()
    

    data class State(
        val renderEvent: RenderEvent = RenderEvent.None
    )

    sealed class RenderEvent 
        object None : RenderEvent()
        class DisplayText(val text: String) : RenderEvent()
    

MainActivity.kt

MainActivity 有一个PublishSubject 的实例,其类型为Event。即MainActivityView.Event.OnViewInitialisedMainActivityView.Event.OnError 等。初始事件通过主题的.onNext(Event) 调用在onCreate() 中发送。

@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main) 

    @Inject
    lateinit var subscriptions: CompositeDisposable

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit var onViewInitialisedSubject: PublishSubject<MainActivityView.Event.OnViewInitialised>

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setupEvents()
    

    override fun onDestroy() 
        super.onDestroy()
        subscriptions.clear()
    

    private fun setupEvents() 
        if (subscriptions.size() == 0) 
            Observable.mergeArray(
                onViewInitialisedSubject
                    .toFlowable(BackpressureStrategy.BUFFER)
                    .toObservable()
            ).observeOn(
                Schedulers.io()
            ).compose(
                viewModel()
            ).observeOn(
                AndroidSchedulers.mainThread()
            ).subscribe(
                ::render
            ).addTo(
                subscriptions
            )

            onViewInitialisedSubject
                .onNext(
                    MainActivityView
                        .Event
                        .OnViewInitialised
                )
        
    

    private fun render(state: MainActivityView.State) 
        when (state.renderEvent) 
            MainActivityView.RenderEvent.None -> Unit
            is MainActivityView.RenderEvent.DisplayText -> 
                mainActivityTextField.text = state.renderEvent.text
            
        
    


MainActivityViewModel.kt

然后这些EventMainActivityViewModel 类拾取,该类由.compose(viewModel()) 调用,然后通过ObservableTransformer&lt;Event, State&gt; 将接收到的Event 转换为一种新的State。视图模型返回一个带有renderEvent 的新状态,然后可以通过render(state: MainActivityView.State) 函数再次在MainActivity 中对其进行操作。

@MainActivityScope
class MainActivityViewModel @Inject constructor(
    private var state: MainActivityView.State
) 

    operator fun invoke(): ObservableTransformer<MainActivityView.Event, MainActivityView.State> = onEvent

    private val onEvent = ObservableTransformer<MainActivityView.Event,
        MainActivityView.State>  upstream: Observable<MainActivityView.Event> ->
        upstream.publish  shared: Observable<MainActivityView.Event> ->
            Observable.mergeArray(
                shared.ofType(MainActivityView.Event.OnViewInitialised::class.java)
            ).compose(
                eventToViewState
            )
        
    

    private val eventToViewState = ObservableTransformer<MainActivityView.Event, MainActivityView.State>  upstream ->
        upstream.flatMap  event ->
            when (event) 
                MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
            
        
    

    private fun onViewInitialisedEvent(): Observable<MainActivityView.State> 
        val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
        state = state.copy(renderEvent = renderEvent)
        return state.asObservable()
    


我可以使用协程/流/通道实现相同的流程吗?甚至可能有点简化?

编辑:

我已经找到了适合我的解决方案,到目前为止我还没有发现任何问题。然而,这个解决方案使用ConflatedBroadcastChannel&lt;T&gt;,最终将被弃用,它可能会用(在撰写本文时)尚未发布的SharedFlow api 替换它(更多关于here。

它的工作方式是 Activity 和 viewmodel 共享 一个ConflatedBroadcastChannel&lt;MainActivity.Event&gt;,用于从Activity(或适配器)发送或提供事件。视图模型将事件减少到一个新的状态,然后发出。 Activity 正在收集由viewModel.invoke() 返回的Flow&lt;State&gt;,并最终呈现发出的State

MainActivityView.kt

object MainActivityView 

    sealed class Event 
        object OnViewInitialised : Event()
        data class OnButtonClicked(val idOfItemClicked: Int) : Event()
    

    data class State(
        val renderEvent: RenderEvent = RenderEvent.Idle
    )

    sealed class RenderEvent 
        object Idle : RenderEvent()
        data class DisplayText(val text: String) : RenderEvent()
    

MainActivity.kt

class MainActivity : AppCompatActivity(R.layout.activity_main) 

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>

    private var isInitialised: Boolean = false

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)

        init()
    
    
    private fun init() 
        if (!isInitialised) 
            
            lifecycleScope.launch 
                viewModel()
                    .flowOn(
                        Dispatchers.IO
                    ).collect(::render)
            

            eventChannel
                .offer(
                    MainActivityView.Event.OnViewInitialised
                )
            isInitialised = true
        
    

    private suspend fun render(state: MainActivityView.State): Unit =
        when (state.renderEvent) 
            MainActivityView.RenderEvent.Idle -> Unit
            is MainActivityView.RenderEvent.DisplayText -> 
                renderDisplayText(text = state.renderEvent.text)
            
        

    private val renderDisplayText(text: String) 
        // render text
    


MainActivityViewModel.kt

class MainActivityViewModel constructor(
    private var state: MainActivityView.State = MainActivityView.State(),
    private val eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>,
 ) 

    suspend fun invoke(): Flow<MainActivityView.State> =
        eventChannel
            .asFlow()
            .flatMapLatest  event: MainActivityView.Event ->
                reduce(event)
            

    private fun reduce(event: MainActivityView.Event): Flow<MainActivityView.State> =
        when (event) 
            MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
            MainActivityView.Event.OnButtonClicked -> onButtonClickedEvent(event.idOfItemClicked)
        

    private fun onViewInitialisedEvent(): Flow<MainActivityView.State> = flow 
        val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
        state = state.copy(renderEvent = renderEvent)
        emit(state)
    

    private fun onButtonClickedEvent(idOfItemClicked: Int): Flow<MainActivityView.State> = flow 
        // do something to handle click
        println("item clicked: $idOfItemClicked")
        emit(state)
    


类似问题:

publishsubject-with-kotlin-coroutines-flow

【问题讨论】:

是的,您可以通过BroadcastChannelasFlow() 来实现这一点。 你能举个例子吗?我无法理解 BroadcastChannel 的工作原理。 【参考方案1】:

您的MainActivity 可能看起来像这样。

@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main) 

    @Inject
    lateinit var subscriptions: CompositeDisposable

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit var onViewInitialisedChannel: BroadcastChannel<MainActivityView.Event.OnViewInitialised>

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setupEvents()
    

    override fun onDestroy() 
        super.onDestroy()
        subscriptions.clear()
    

    private fun setupEvents() 
        if (subscriptions.size() == 0) 
            onViewInitialisedChannel.asFlow()
                .buffer()
                .flowOn(Dispatchers.IO)
                .onEach(::render)
                .launchIn(GlobalScope)

            onViewInitialisedChannel
                .offer(
                    MainActivityView
                        .Event
                        .OnViewInitialised
                )
        
    

    private fun render(state: MainActivityView.State) 
        when (state.renderEvent) 
            MainActivityView.RenderEvent.None -> Unit
            is MainActivityView.RenderEvent.DisplayText -> 
                mainActivityTextField.text = state.renderEvent.text
            
        
    


【讨论】:

多米尼克,谢谢。但也许我不够清楚,或者我误解了,但我不明白这如何适合我遇到的问题。问题是将“事件”分派给“视图模型”,让视图模型将事件转换为新状态,然后由片段中的渲染函数返回并渲染。我没有按照您的示例建议立即渲染“事件”类,但是渲染的是vm返回的状态。在您的示例中,我没有看到对 vm 的任何调用,如果还有更多事件怎么办?例如 OnButtonClickEvent 等。【参考方案2】:

我认为您正在寻找的是 composeObservableTransformer 的 Flow 版本,据我所知没有。您可以改用 let 运算符并执行 类似 的操作:

主活动:

yourFlow
  .let(viewModel::invoke)
  .onEach(::render)
  .launchIn(lifecycleScope) // or viewLifecycleOwner.lifecycleScope if you're in a fragment

视图模型:

operator fun invoke(viewEventFlow: Flow<Event>): Flow<State> = viewEventFlow.flatMapLatest  event ->
  when (event) 
    Event.OnViewInitialised -> flowOf(onViewInitialisedEvent())
  

就分享流程而言,我会关注这些问题:

https://github.com/Kotlin/kotlinx.coroutines/issues/2034 https://github.com/Kotlin/kotlinx.coroutines/issues/2047

Dominic 的回答可能适用于替换发布主题,但我认为协程团队正在远离 BroadcastChannel 并打算在不久的将来弃用它。

【讨论】:

抱歉没有及时回复。我已经用我目前使用的解决方案更新了我的帖子。它确实使用了 ConflatedBroadcastChannel API(如您所指出的),该 API 将在未来被弃用。希望可以用 SharedFlow 替换它。【参考方案3】:

kotlinx-coroutines-core 提供了一个transform 函数。

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html

它与我们在 RxJava 中使用的不太一样,但应该可以用于实现相同的结果。

【讨论】:

以上是关于有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?

有没有办法以编程方式使用kotlin更改片段中的文本颜色?

有没有办法在 Kotlin 活动和 Java 活动之间获取结果?

有没有办法在 kotlin 轻松打开和关闭流? [复制]

有没有办法直接在界面中解析改造响应,用函数解析响应

Kotlin 与 JavaFXPorts 的兼容性