有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?
Posted
技术标签:
【中文标题】有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?【英文标题】:Is there a way to achieve this rx flow in Kotlin with coroutines/Flow/Channels? 【发布时间】:2020-01-08 13:48:48 【问题描述】:我是第一次尝试 Kotlin Coroutines 和 Flow,我正在尝试使用 MVI-ish 方法重现我在带有 RxJava 的 android 上使用的某个流程,但我很难正确处理,而且我基本上被卡住了此时。
RxJava 应用程序看起来基本上是这样的:
MainActivityView.kt
object MainActivityView
sealed class Event
object OnViewInitialised : Event()
data class State(
val renderEvent: RenderEvent = RenderEvent.None
)
sealed class RenderEvent
object None : RenderEvent()
class DisplayText(val text: String) : RenderEvent()
MainActivity.kt
MainActivity 有一个PublishSubject
的实例,其类型为Event
。即MainActivityView.Event.OnViewInitialised
、MainActivityView.Event.OnError
等。初始事件通过主题的.onNext(Event)
调用在onCreate()
中发送。
@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main)
@Inject
lateinit var subscriptions: CompositeDisposable
@Inject
lateinit var viewModel: MainActivityViewModel
@Inject
lateinit var onViewInitialisedSubject: PublishSubject<MainActivityView.Event.OnViewInitialised>
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setupEvents()
override fun onDestroy()
super.onDestroy()
subscriptions.clear()
private fun setupEvents()
if (subscriptions.size() == 0)
Observable.mergeArray(
onViewInitialisedSubject
.toFlowable(BackpressureStrategy.BUFFER)
.toObservable()
).observeOn(
Schedulers.io()
).compose(
viewModel()
).observeOn(
AndroidSchedulers.mainThread()
).subscribe(
::render
).addTo(
subscriptions
)
onViewInitialisedSubject
.onNext(
MainActivityView
.Event
.OnViewInitialised
)
private fun render(state: MainActivityView.State)
when (state.renderEvent)
MainActivityView.RenderEvent.None -> Unit
is MainActivityView.RenderEvent.DisplayText ->
mainActivityTextField.text = state.renderEvent.text
MainActivityViewModel.kt
然后这些Event
被MainActivityViewModel
类拾取,该类由.compose(viewModel())
调用,然后通过ObservableTransformer<Event, State>
将接收到的Event
转换为一种新的State
。视图模型返回一个带有renderEvent
的新状态,然后可以通过render(state: MainActivityView.State)
函数再次在MainActivity
中对其进行操作。
@MainActivityScope
class MainActivityViewModel @Inject constructor(
private var state: MainActivityView.State
)
operator fun invoke(): ObservableTransformer<MainActivityView.Event, MainActivityView.State> = onEvent
private val onEvent = ObservableTransformer<MainActivityView.Event,
MainActivityView.State> upstream: Observable<MainActivityView.Event> ->
upstream.publish shared: Observable<MainActivityView.Event> ->
Observable.mergeArray(
shared.ofType(MainActivityView.Event.OnViewInitialised::class.java)
).compose(
eventToViewState
)
private val eventToViewState = ObservableTransformer<MainActivityView.Event, MainActivityView.State> upstream ->
upstream.flatMap event ->
when (event)
MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
private fun onViewInitialisedEvent(): Observable<MainActivityView.State>
val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
state = state.copy(renderEvent = renderEvent)
return state.asObservable()
我可以使用协程/流/通道实现相同的流程吗?甚至可能有点简化?
编辑:
我已经找到了适合我的解决方案,到目前为止我还没有发现任何问题。然而,这个解决方案使用ConflatedBroadcastChannel<T>
,最终将被弃用,它可能会用(在撰写本文时)尚未发布的SharedFlow
api 替换它(更多关于here。
它的工作方式是 Activity
和 viewmodel 共享
一个ConflatedBroadcastChannel<MainActivity.Event>
,用于从Activity
(或适配器)发送或提供事件。视图模型将事件减少到一个新的状态,然后发出。 Activity
正在收集由viewModel.invoke()
返回的Flow<State>
,并最终呈现发出的State
。
MainActivityView.kt
object MainActivityView
sealed class Event
object OnViewInitialised : Event()
data class OnButtonClicked(val idOfItemClicked: Int) : Event()
data class State(
val renderEvent: RenderEvent = RenderEvent.Idle
)
sealed class RenderEvent
object Idle : RenderEvent()
data class DisplayText(val text: String) : RenderEvent()
MainActivity.kt
class MainActivity : AppCompatActivity(R.layout.activity_main)
@Inject
lateinit var viewModel: MainActivityViewModel
@Inject
lateinit eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>
private var isInitialised: Boolean = false
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
init()
private fun init()
if (!isInitialised)
lifecycleScope.launch
viewModel()
.flowOn(
Dispatchers.IO
).collect(::render)
eventChannel
.offer(
MainActivityView.Event.OnViewInitialised
)
isInitialised = true
private suspend fun render(state: MainActivityView.State): Unit =
when (state.renderEvent)
MainActivityView.RenderEvent.Idle -> Unit
is MainActivityView.RenderEvent.DisplayText ->
renderDisplayText(text = state.renderEvent.text)
private val renderDisplayText(text: String)
// render text
MainActivityViewModel.kt
class MainActivityViewModel constructor(
private var state: MainActivityView.State = MainActivityView.State(),
private val eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>,
)
suspend fun invoke(): Flow<MainActivityView.State> =
eventChannel
.asFlow()
.flatMapLatest event: MainActivityView.Event ->
reduce(event)
private fun reduce(event: MainActivityView.Event): Flow<MainActivityView.State> =
when (event)
MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
MainActivityView.Event.OnButtonClicked -> onButtonClickedEvent(event.idOfItemClicked)
private fun onViewInitialisedEvent(): Flow<MainActivityView.State> = flow
val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
state = state.copy(renderEvent = renderEvent)
emit(state)
private fun onButtonClickedEvent(idOfItemClicked: Int): Flow<MainActivityView.State> = flow
// do something to handle click
println("item clicked: $idOfItemClicked")
emit(state)
类似问题:
publishsubject-with-kotlin-coroutines-flow【问题讨论】:
是的,您可以通过BroadcastChannel
和asFlow()
来实现这一点。
你能举个例子吗?我无法理解 BroadcastChannel 的工作原理。
【参考方案1】:
您的MainActivity
可能看起来像这样。
@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main)
@Inject
lateinit var subscriptions: CompositeDisposable
@Inject
lateinit var viewModel: MainActivityViewModel
@Inject
lateinit var onViewInitialisedChannel: BroadcastChannel<MainActivityView.Event.OnViewInitialised>
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setupEvents()
override fun onDestroy()
super.onDestroy()
subscriptions.clear()
private fun setupEvents()
if (subscriptions.size() == 0)
onViewInitialisedChannel.asFlow()
.buffer()
.flowOn(Dispatchers.IO)
.onEach(::render)
.launchIn(GlobalScope)
onViewInitialisedChannel
.offer(
MainActivityView
.Event
.OnViewInitialised
)
private fun render(state: MainActivityView.State)
when (state.renderEvent)
MainActivityView.RenderEvent.None -> Unit
is MainActivityView.RenderEvent.DisplayText ->
mainActivityTextField.text = state.renderEvent.text
【讨论】:
多米尼克,谢谢。但也许我不够清楚,或者我误解了,但我不明白这如何适合我遇到的问题。问题是将“事件”分派给“视图模型”,让视图模型将事件转换为新状态,然后由片段中的渲染函数返回并渲染。我没有按照您的示例建议立即渲染“事件”类,但是渲染的是vm返回的状态。在您的示例中,我没有看到对 vm 的任何调用,如果还有更多事件怎么办?例如 OnButtonClickEvent 等。【参考方案2】:我认为您正在寻找的是 compose
和 ObservableTransformer
的 Flow 版本,据我所知没有。您可以改用 let
运算符并执行 类似 的操作:
主活动:
yourFlow
.let(viewModel::invoke)
.onEach(::render)
.launchIn(lifecycleScope) // or viewLifecycleOwner.lifecycleScope if you're in a fragment
视图模型:
operator fun invoke(viewEventFlow: Flow<Event>): Flow<State> = viewEventFlow.flatMapLatest event ->
when (event)
Event.OnViewInitialised -> flowOf(onViewInitialisedEvent())
就分享流程而言,我会关注这些问题:
https://github.com/Kotlin/kotlinx.coroutines/issues/2034 https://github.com/Kotlin/kotlinx.coroutines/issues/2047Dominic 的回答可能适用于替换发布主题,但我认为协程团队正在远离 BroadcastChannel
并打算在不久的将来弃用它。
【讨论】:
抱歉没有及时回复。我已经用我目前使用的解决方案更新了我的帖子。它确实使用了 ConflatedBroadcastChannelkotlinx-coroutines-core 提供了一个transform
函数。
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
它与我们在 RxJava 中使用的不太一样,但应该可以用于实现相同的结果。
【讨论】:
以上是关于有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?的主要内容,如果未能解决你的问题,请参考以下文章
有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?