PublishSubject 与 Kotlin 协程(流/通道)
Posted
技术标签:
【中文标题】PublishSubject 与 Kotlin 协程(流/通道)【英文标题】:PublishSubject with Kotlin coroutines (Flow/Channel) 【发布时间】:2021-06-18 12:49:03 【问题描述】:我正在上传经过改造的文件。想听听Kotlin的改造进度,用ui上的进度条显示。当我对此进行研究时,我在此页面上使用 PublishSubject 在 java 中找到了一个很好的示例。 Is it possible to show progress bar when upload image via Retrofit 2? 我想将其改编为我自己的代码,但我无法决定是否应该使用 Flow 或 Channel 而不是 Kotlin 中的 PublishSubject。
我的 requestbody 类与 broadcastChannel:
class ProgressRequestBody : RequestBody
val mFile: File
val ignoreFirstNumberOfWriteToCalls : Int
constructor(mFile: File) : super()
this.mFile = mFile
ignoreFirstNumberOfWriteToCalls = 0
constructor(mFile: File, ignoreFirstNumberOfWriteToCalls : Int) : super()
this.mFile = mFile
this.ignoreFirstNumberOfWriteToCalls = ignoreFirstNumberOfWriteToCalls
var numWriteToCalls = 0
val getProgress = BroadcastChannel<Float>(1)
override fun contentType(): MediaType?
return "image/*".toMediaTypeOrNull()
@Throws(IOException::class)
override fun contentLength(): Long
return mFile.length()
@Throws(IOException::class)
override fun writeTo(sink: BufferedSink)
numWriteToCalls++
val fileLength = mFile.length()
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
val `in` = FileInputStream(mFile)
var uploaded: Long = 0
try
var read: Int
var lastProgressPercentUpdate = 0.0f
read = `in`.read(buffer)
while (read != -1)
uploaded += read.toLong()
sink.write(buffer, 0, read)
read = `in`.read(buffer)
// when using HttpLoggingInterceptor it calls writeTo and passes data into a local buffer just for logging purposes.
// the second call to write to is the progress we actually want to track
if (numWriteToCalls > ignoreFirstNumberOfWriteToCalls )
val progress = (uploaded.toFloat() / fileLength.toFloat()) * 100f
//prevent publishing too many updates, which slows upload, by checking if the upload has progressed by at least 1 percent
if (progress - lastProgressPercentUpdate > 1 || progress == 100f)
// publish progress
getProgress.send(progress)
lastProgressPercentUpdate = progress
finally
`in`.close()
companion object
private val DEFAULT_BUFFER_SIZE = 2048
问题是 broadcastChannel.send(progress)
希望函数暂停,但 RequestBody writeTo
方法不应该暂停。在这种情况下,我很困惑。我应该使用 Flow 还是 BroadCastChannel?你能帮帮我吗?
【问题讨论】:
如果您在 JVM 或 android 上,您应该可以使用sendBlocking()
而不是 send()
,它不会挂起(但会阻塞当前线程)。请注意,在将阻塞代码与挂起代码桥接的情况下,您始终可以选择使用 runBlocking
(同样,仅在 JVM 上)
StateFlow
【参考方案1】:
你应该使用 MutableStateFlow:
val getProgress = MutableStateFlow<Float>(0f) // Initial value is 0
然后:
getProgress.value = progress
查看更多:StateFlow and SharedFlow
【讨论】:
【参考方案2】:val getProgress = MutableSharedFlow<Float>(extraBufferCapacity = 1)
getProgress.tryEmit(progress)
【讨论】:
感谢您的回复,这解决了我的问题以上是关于PublishSubject 与 Kotlin 协程(流/通道)的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 启动协程launch 与async的区别按照顺序启动协程
Kotlin 协程协程简介 ( 协程概念 | 协程作用 | 创建 Android 工程并进行协程相关配置开发 | 异步任务与协程对比 )
Kotlin 协程协程简介 ( 协程概念 | 协程作用 | 创建 Android 工程并进行协程相关配置开发 | 异步任务与协程对比 )