使用协程的 Firebase 实时快照监听器
Posted
技术标签:
【中文标题】使用协程的 Firebase 实时快照监听器【英文标题】:Firebase realtime snapshot listener using Coroutines 【发布时间】:2019-08-22 21:43:29 【问题描述】:我希望能够在我的 ViewModel 中使用 Kotlin 协程来收听 Firebase DB 中的实时更新。
问题在于,每当在集合中创建新消息时,我的应用程序都会冻结并且无法从该状态中恢复。我需要杀死它并重新启动应用程序。
这是第一次通过,我可以在 UI 上看到之前的消息。第二次调用SnapshotListener
时会出现此问题。
我的observer()
函数
val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener data, error ->
if (error != null)
channel.close(error)
else
if (data != null)
val messages = data.toObjects(MessageEntity::class.java)
//till this point it gets executed^^^^
channel.sendBlocking(messages)
else
channel.close(CancellationException("No data received"))
return channel
这就是我想要观察消息的方式
launch(Dispatchers.IO)
val newMessages =
messageRepository
.observer()
.receive()
在我用send()
替换sendBlocking()
之后,我仍然没有在频道中收到任何新消息。 SnapshotListener
方被执行
//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO)
channel.send(messages)
//scope is my viewModel
如何使用 Kotlin 协程观察 firestore/realtime-dbs 中的消息?
【问题讨论】:
Firebase 回调默认在主线程上执行。我看到你在主线程上调用了一个名为sendBlocking
的方法。阻塞主线程总是一个坏主意。您需要找到另一种使用 Firebase SDK 的方法,而不是像这样阻塞主线程。
@DougStevenson 我找到了解决方案
【参考方案1】:
我有这些扩展函数,所以我可以简单地从查询中获取结果作为流。
Flow 是一个完美的 Kotlin 协程结构。 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?>
return callbackFlow
val listenerRegistration =
addSnapshotListener querySnapshot, firebaseFirestoreException ->
if (firebaseFirestoreException != null)
cancel(
message = "error fetching collection data at path - $path",
cause = firebaseFirestoreException
)
return@addSnapshotListener
offer(querySnapshot)
awaitClose
Timber.d("cancelling the listener on collection at path - $path")
listenerRegistration.remove()
@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T>
return getQuerySnapshotFlow()
.map
return@map mapper(it)
以下是如何使用上述功能的示例。
@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>>
return FirebaseFirestore.getInstance()
.collection("$COLLECTION_SHOPPING_LIST")
.getDataFlow querySnapshot ->
querySnapshot?.documents?.map
getShoppingListItemFromSnapshot(it)
?: listOf()
// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem
return documentSnapshot.toObject(ShoppingListItem::class.java)!!
在您的 ViewModel 类(或您的 Fragment)中,确保您从正确的范围内调用它,以便在用户离开屏幕时适当地移除侦听器。
viewModelScope.launch
getShoppingListItemsFlow().collect
// Show on the view.
【讨论】:
你在 getShoppingListItemFromSnapshot(it) 中做了什么? :)offer(element)
方法(从 KotlinX Coroutines 1.5.0+ 开始)似乎已被 trySend(element).isSuccess
弃用,根据 source code(还引用了 GitHub 问题:github.com/Kotlin/kotlinx.coroutines/issues/974 )【参考方案2】:
最后我使用了Flow,它是协同程序的一部分1.2.0-alpha-2
return flowViaChannel channel ->
firestore.collection(path).addSnapshotListener data, error ->
if (error != null)
channel.close(error)
else
if (data != null)
val messages = data.toObjects(MessageEntity::class.java)
channel.sendBlocking(messages)
else
channel.close(CancellationException("No data received"))
channel.invokeOnClose
it?.printStackTrace()
这就是我在 ViewModel 中观察它的方式
launch
messageRepository.observe().collect
//process
更多主题https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9
【讨论】:
【参考方案3】:移除回调的扩展函数
对于 Firebase 的 Firestore 数据库,有两种调用类型。
-
一次性请求 -
addOnCompleteListener
实时更新 - addSnapshotListener
一次性请求
对于一次性请求,库org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X
提供了一个await
扩展函数。该函数从addOnCompleteListener
返回结果。
有关最新版本,请参阅 Maven 存储库,kotlinx-coroutines-play-services。
资源
Using Firebase on android with Kotlin CoroutinesJoe Birch Using Kotlin Extension Functions and Coroutines with FirebaseRosário Pereira Fernandes实时更新
扩展函数awaitRealtime
进行检查,包括验证continuation
的状态,以查看它是否处于isActive
状态。这一点很重要,因为当用户的主要内容提要通过生命周期事件更新、手动刷新提要或从提要中删除内容时,会调用该函数。没有这个检查就会崩溃。
ExtensionFuction.kt
data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)
suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> continuation ->
addSnapshotListener( value, error ->
if (error == null && continuation.isActive)
continuation.resume(QueryResponse(value, null))
else if (error != null && continuation.isActive)
continuation.resume(QueryResponse(null, error))
)
为了处理错误,使用了try
/catch
模式。
Repository.kt
object ContentRepository
fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>>
emit(Loading())
val labeledSet = HashSet<String>()
val user = usersDocument.collection(getInstance().currentUser!!.uid)
syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
getLoggedInNonRealtimeContent(timeframe, labeledSet, this)
// Realtime updates with 'awaitRealtime' used
private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
labeledSet: HashSet<String>, collection: String,
lce: FlowCollector<Lce<PagedListResult>>)
val response = user.document(COLLECTIONS_DOCUMENT)
.collection(collection)
.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
.awaitRealtime()
if (response.error == null)
val contentList = response.packet?.documentChanges?.map doc ->
doc.document.toObject(Content::class.java).also content ->
labeledSet.add(content.id)
database.contentDao().insertContentList(contentList)
else lce.emit(Error(PagedListResult(null,
"Error retrieving user save_collection: $response.error?.localizedMessage")))
// One time updates with 'await' used
private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
labeledSet: HashSet<String>,
lce: FlowCollector<Lce<PagedListResult>>) =
try
database.contentDao().insertContentList(
contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
.documentChanges
?.map change -> change.document.toObject(Content::class.java)
?.filter content -> !labeledSet.contains(content.id) )
lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
catch (error: FirebaseFirestoreException)
lce.emit(Error(PagedListResult(
null,
CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "$error.localizedMessage")))
【讨论】:
我尝试了您的 awaitRealtime() 扩展功能,但它对我不起作用。它只在 Firestore 中与实际模型一起发射一次。在发出 continuation.isActive() 之后总是返回 false。你知道为什么/如何让它保持活跃吗? 感谢@elementstyle 的反馈!我昨晚注意到了这个问题,现在正在努力解决它。 我已删除条件!value!!.isEmpty
已解决问题。我将全天测试以确保它正常工作。
嗨@elementstyle,修复在24小时测试后工作。如果它也适用于您自己,请投票赞成答案。谢谢!
感谢您让我知道@Andrew。如果有机会,我将重新查看开源 Coinverse 应用程序中的代码的当前实现,并在必要时在此处更新。【参考方案4】:
这对我有用:
suspend fun DocumentReference.observe(block: suspend (getNextSnapshot: suspend ()->DocumentSnapshot?)->Unit)
val channel = Channel<Pair<DocumentSnapshot?, FirebaseFirestoreException?>>(Channel.UNLIMITED)
val listenerRegistration = this.addSnapshotListener value, error ->
channel.sendBlocking(Pair(value, error))
try
block
val (value, error) = channel.receive()
if (error != null)
throw error
value
finally
channel.close()
listenerRegistration.remove()
然后你可以像这样使用它:
docRef.observe getNextSnapshot ->
while (true)
val value = getNextSnapshot() ?: continue
// do whatever you like with the database snapshot
如果观察者块抛出错误,或者块结束,或者你的协程被取消,监听器会被自动移除。
【讨论】:
以上是关于使用协程的 Firebase 实时快照监听器的主要内容,如果未能解决你的问题,请参考以下文章
Firestore:如果两个侦听器监听相同的查询 firebase 维护两个不同的查询快照?
我们可以在 ios serviceworker 上运行 firebase 实时监听吗?