使用协程的 Firebase 实时快照监听器

Posted

技术标签:

【中文标题】使用协程的 Firebase 实时快照监听器【英文标题】:Firebase realtime snapshot listener using Coroutines 【发布时间】:2019-08-22 21:43:29 【问题描述】:

我希望能够在我的 ViewModel 中使用 Kotlin 协程来收听 Firebase DB 中的实时更新。

问题在于,每当在集合中创建新消息时,我的应用程序都会冻结并且无法从该状态中恢复。我需要杀死它并重新启动应用程序。

这是第一次通过,我可以在 UI 上看到之前的消息。第二次调用SnapshotListener 时会出现此问题。

我的observer() 函数

val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener  data, error ->
    if (error != null) 
        channel.close(error)
     else 
        if (data != null) 
            val messages = data.toObjects(MessageEntity::class.java)
            //till this point it gets executed^^^^
            channel.sendBlocking(messages)
         else 
            channel.close(CancellationException("No data received"))
        
    

return channel

这就是我想要观察消息的方式

launch(Dispatchers.IO) 
        val newMessages =
            messageRepository
                .observer()
                .receive()
    

在我用send() 替换sendBlocking() 之后,我仍然没有在频道中收到任何新消息。 SnapshotListener方被执行

//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO) 
    channel.send(messages)

//scope is my viewModel

如何使用 Kotlin 协程观察 firestore/realtime-dbs 中的消息?

【问题讨论】:

Firebase 回调默认在主线程上执行。我看到你在主线程上调用了一个名为sendBlocking 的方法。阻塞主线程总是一个坏主意。您需要找到另一种使用 Firebase SDK 的方法,而不是像这样阻塞主线程。 @DougStevenson 我找到了解决方案 【参考方案1】:

我有这些扩展函数,所以我可以简单地从查询中获取结果作为流。

Flow 是一个完美的 Kotlin 协程结构。 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/

@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?> 
    return callbackFlow 
        val listenerRegistration =
            addSnapshotListener  querySnapshot, firebaseFirestoreException ->
                if (firebaseFirestoreException != null) 
                    cancel(
                        message = "error fetching collection data at path - $path",
                        cause = firebaseFirestoreException
                    )
                    return@addSnapshotListener
                
                offer(querySnapshot)
            
        awaitClose 
            Timber.d("cancelling the listener on collection at path - $path")
            listenerRegistration.remove()
        
    


@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> 
    return getQuerySnapshotFlow()
        .map 
            return@map mapper(it)
        

以下是如何使用上述功能的示例。

@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>> 
    return FirebaseFirestore.getInstance()
        .collection("$COLLECTION_SHOPPING_LIST")
        .getDataFlow  querySnapshot ->
            querySnapshot?.documents?.map 
                getShoppingListItemFromSnapshot(it)
             ?: listOf()
        


// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem 
        return documentSnapshot.toObject(ShoppingListItem::class.java)!!
    

在您的 ViewModel 类(或您的 Fragment)中,确保您从正确的范围内调用它,以便在用户离开屏幕时适当地移除侦听器。

viewModelScope.launch 
   getShoppingListItemsFlow().collect
     // Show on the view.
   

【讨论】:

你在 getShoppingListItemFromSnapshot(it) 中做了什么? :) offer(element) 方法(从 KotlinX Coroutines 1.5.0+ 开始)似乎已被 trySend(element).isSuccess 弃用,根据 source code(还引用了 GitHub 问题:github.com/Kotlin/kotlinx.coroutines/issues/974 )【参考方案2】:

最后我使用了Flow,它是协同程序的一部分1.2.0-alpha-2

return flowViaChannel  channel ->
   firestore.collection(path).addSnapshotListener  data, error ->
        if (error != null) 
            channel.close(error)
         else 
            if (data != null) 
                val messages = data.toObjects(MessageEntity::class.java)
                channel.sendBlocking(messages)
             else 
                channel.close(CancellationException("No data received"))
            
        
    
    channel.invokeOnClose 
        it?.printStackTrace()
    
 

这就是我在 ViewModel 中观察它的方式

launch 
    messageRepository.observe().collect 
        //process
    

更多主题https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9

【讨论】:

【参考方案3】:

移除回调的扩展函数

对于 Firebase 的 Firestore 数据库,有两种调用类型。

    一次性请求 - addOnCompleteListener 实时更新 - addSnapshotListener

一次性请求

对于一次性请求,库org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X 提供了一个await 扩展函数。该函数从addOnCompleteListener返回结果。

有关最新版本,请参阅 Maven 存储库,kotlinx-coroutines-play-services

资源

Using Firebase on android with Kotlin CoroutinesJoe Birch Using Kotlin Extension Functions and Coroutines with FirebaseRosário Pereira Fernandes

实时更新

扩展函数awaitRealtime 进行检查,包括验证continuation 的状态,以查看它是否处于isActive 状态。这一点很重要,因为当用户的主要内容提要通过生命周期事件更新、手动刷新提要或从提要中删除内容时,会调用该函数。没有这个检查就会崩溃。

ExtensionFuction.kt

data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)

suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse>  continuation ->
    addSnapshotListener( value, error ->
        if (error == null && continuation.isActive)
            continuation.resume(QueryResponse(value, null))
        else if (error != null && continuation.isActive)
            continuation.resume(QueryResponse(null, error))
    )

为了处理错误,使用了try/catch 模式。

Repository.kt

object ContentRepository 
    fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> 
        emit(Loading())
        val labeledSet = HashSet<String>()
        val user = usersDocument.collection(getInstance().currentUser!!.uid)
        syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
        getLoggedInNonRealtimeContent(timeframe, labeledSet, this)        
    
    // Realtime updates with 'awaitRealtime' used
    private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
                                       labeledSet: HashSet<String>, collection: String,
                                       lce: FlowCollector<Lce<PagedListResult>>) 
        val response = user.document(COLLECTIONS_DOCUMENT)
            .collection(collection)
            .orderBy(TIMESTAMP, DESCENDING)
            .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
            .awaitRealtime()
        if (response.error == null) 
            val contentList = response.packet?.documentChanges?.map  doc ->
                doc.document.toObject(Content::class.java).also  content ->
                    labeledSet.add(content.id)
                
            
            database.contentDao().insertContentList(contentList)
         else lce.emit(Error(PagedListResult(null,
            "Error retrieving user save_collection: $response.error?.localizedMessage")))
    
    // One time updates with 'await' used
    private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
                                                      labeledSet: HashSet<String>,
                                                      lce: FlowCollector<Lce<PagedListResult>>) =
            try 
                database.contentDao().insertContentList(
                        contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                                .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
                                .documentChanges
                                ?.map  change -> change.document.toObject(Content::class.java) 
                                ?.filter  content -> !labeledSet.contains(content.id) )
                lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
             catch (error: FirebaseFirestoreException) 
                lce.emit(Error(PagedListResult(
                        null,
                        CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "$error.localizedMessage")))
            

【讨论】:

我尝试了您的 awaitRealtime() 扩展功能,但它对我不起作用。它只在 Firestore 中与实际模型一起发射一次。在发出 continuation.isActive() 之后总是返回 false。你知道为什么/如何让它保持活跃吗? 感谢@elementstyle 的反馈!我昨晚注意到了这个问题,现在正在努力解决它。 我已删除条件!value!!.isEmpty 已解决问题。我将全天测试以确保它正常工作。 嗨@elementstyle,修复在24小时测试后工作。如果它也适用于您自己,请投票赞成答案。谢谢! 感谢您让我知道@Andrew。如果有机会,我将重新查看开源 Coinverse 应用程序中的代码的当前实现,并在必要时在此处更新。【参考方案4】:

这对我有用:

suspend fun DocumentReference.observe(block: suspend (getNextSnapshot: suspend ()->DocumentSnapshot?)->Unit) 
    val channel = Channel<Pair<DocumentSnapshot?, FirebaseFirestoreException?>>(Channel.UNLIMITED)

    val listenerRegistration = this.addSnapshotListener  value, error ->
        channel.sendBlocking(Pair(value, error))
    

    try 
        block 
            val (value, error) = channel.receive()

            if (error != null) 
                throw error
            
            value
        
    
    finally 
        channel.close()
        listenerRegistration.remove()
    

然后你可以像这样使用它:

docRef.observe  getNextSnapshot ->
    while (true) 
         val value = getNextSnapshot() ?: continue
         // do whatever you like with the database snapshot
    

如果观察者块抛出错误,或者块结束,或者你的协程被取消,监听器会被自动移除。

【讨论】:

以上是关于使用协程的 Firebase 实时快照监听器的主要内容,如果未能解决你的问题,请参考以下文章

Firestore:如果两个侦听器监听相同的查询 firebase 维护两个不同的查询快照?

从Firebase本地实时存储Room DB上的数据

Firebase实时数据库单值监听器多次触发

我们可以在 ios serviceworker 上运行 firebase 实时监听吗?

如何实现一个 Firebase 监听器来观察数据库的实时变化

Firebase:如何停止收听快照