如何使用 kotlin 协程处理回调

Posted

技术标签:

【中文标题】如何使用 kotlin 协程处理回调【英文标题】:how to handle callback using kotlin coroutines 【发布时间】:2020-01-10 02:17:33 【问题描述】:

以下 sn-p 在顺序代码流中将结果返回为“null”。我知道协程可能是异步处理回调的可行解决方案。


    fun getUserProperty(path: String): String? 
        var result: String? = null
        database.child(KEY_USERS).child(getUid()).child(path)
            .addListenerForSingleValueEvent(object : ValueEventListener 
                override fun onCancelled(error: DatabaseError) 
                    Log.e(TAG, "error: $error")
                

                override fun onDataChange(snapshot: DataSnapshot) 
                    Log.w(TAG, "value: $snapshot.value")
                    result = snapshot.value.toString()
                
            )
        return result
    

协程在这种情况下是否可以帮助等待回调的结果(onDataChange()/onCancelled())?

【问题讨论】:

【参考方案1】:

由于 Firebase 实时数据库 SDK 不提供任何挂起功能,协程在处理其 API 时没有帮助。您需要将回调转换为挂起函数,以便能够在协程中等待结果。

这是一个暂停扩展功能(我通过谷歌搜索discovered a solution它):

suspend fun DatabaseReference.getValue(): DataSnapshot 
    return async(CommonPool) 
        suspendCoroutine<DataSnapshot>  continuation ->
            addListenerForSingleValueEvent(FValueEventListener(
                    onDataChange =  continuation.resume(it) ,
                    onError =  continuation.resumeWithException(it.toException()) 
            ))
        
    .await()


class FValueEventListener(val onDataChange: (DataSnapshot) -> Unit, val onError: (DatabaseError) -> Unit) : ValueEventListener 
    override fun onDataChange(data: DataSnapshot) = onDataChange.invoke(data)
    override fun onCancelled(error: DatabaseError) = onError.invoke(error)

有了这个,您现在如何在协同程序中等待 DatabaseReference 上的 getValue() 可疑方法。

【讨论】:

道格,感谢您的意见。是的,遇到了一些类似的方法。想知道用法(引用数据库中的节点,接收值)是如何工作的? gist.github.com/beyondeye/f69ba427938f79801c291d18131ff1d9 我不确定你在问什么。 上面的 sn-p 不确定如何从 getValue() 调用和返回值。正在寻找深入的文章以更好地理解。 async 在 kotlin 中无法使用有什么原因吗?只能找到GlobalScope.async【参考方案2】:

singleValueEvent 的 @Doug 示例,如果您想继续列出,可以使用 coroutine flow,如下所示:

@ExperimentalCoroutinesApi
inline fun <reified T> DatabaseReference.listen(): Flow<DataResult<T?>> =
  callbackFlow 
    val valueListener = object : ValueEventListener 
      override fun onCancelled(databaseError: DatabaseError) 
        close(databaseError.toException())
      

      override fun onDataChange(dataSnapshot: DataSnapshot) 
        try 
          val value = dataSnapshot.getValue(T::class.java)
          offer(DataResult.Success(value))
         catch (exp: Exception) 
          Timber.e(exp)
          if (!isClosedForSend) offer(DataResult.Error(exp))
        
      
    
    addValueEventListener(valueListener)

    awaitClose  removeEventListener(valueListener) 
  

【讨论】:

我们必须有awaitClose吗? @IgorGanapolsky 是的【参考方案3】:

如果有人仍然使用原始答案的代码,但需要更新它以匹配 Coroutines 的非实验版本,这就是我如何更改它:

import com.google.firebase.database.DataSnapshot
import com.google.firebase.database.DatabaseError
import com.google.firebase.database.DatabaseReference
import com.google.firebase.database.ValueEventListener
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

suspend fun DatabaseReference.getSnapshotValue(): DataSnapshot 
    return withContext(Dispatchers.IO) 
        suspendCoroutine<DataSnapshot>  continuation ->
            addListenerForSingleValueEvent(FValueEventListener(
                onDataChange =  continuation.resume(it) ,
                onError =  continuation.resumeWithException(it.toException()) 
            ))
        
    


class FValueEventListener(val onDataChange: (DataSnapshot) -> Unit, val onError: (DatabaseError) -> Unit) : ValueEventListener 
    override fun onDataChange(data: DataSnapshot) = onDataChange.invoke(data)
    override fun onCancelled(error: DatabaseError) = onError.invoke(error)

然后使用它就像:val snapshot = ref.getSnapshotValue()

更新

我还需要观察一个节点并使用 Omar 的答案来做到这一点。如果有人需要一个如何在这里使用它的例子,那就是:

@ExperimentalCoroutinesApi
inline fun <reified T> DatabaseReference.listen(): Flow<T?>? =
callbackFlow 
    val valueListener = object : ValueEventListener 
        override fun onCancelled(databaseError: DatabaseError) 
            close()
        

        override fun onDataChange(dataSnapshot: DataSnapshot) 
            try 
                val value = dataSnapshot.getValue(T::class.java)
                offer(value)
             catch (exp: Exception) 
                if (!isClosedForSend) offer(null)
            
        
    
    addValueEventListener(valueListener)
    awaitClose  removeEventListener(valueListener) 

然后要在 Activity 或 Fragment 中调用它,您可以像这样创建您的侦听器:

var listener =  FirebaseUtils.databaseReference
   .child(AppConstants.FIREBASE_PATH_EMPLOYEES)
   .child(AuthUtils.retrieveUID()!!).listen<User>()

然后在你的函数中调用它:

CoroutineScope(IO).launch 
    withContext(IO) 
        listener?.collect
            print(it)
        
    

然后在onStop()里面处理:

override fun onStop()
    listener = null
    super.onStop()

【讨论】:

为什么需要CoroutineScope(IO)withContext(IO) 我不得不这样做,因为我没有使用 ViewModel 库。我有另一个协程我需要使用它,所以你可以删除 withContext 我认为

以上是关于如何使用 kotlin 协程处理回调的主要内容,如果未能解决你的问题,请参考以下文章

深入理解Kotlin协程如何将回调改写成挂起函数

Kotlin回调函数转协程

Kotlin 协程 - 延迟,它是如何工作的?

使用协程和 Flow 简化 API 设计

使用协程和 Flow 简化 API 设计

kotlin协程硬核解读(1. 协程初体验)