Kotlin Coroutines 通道在 fixedRateTimer 内发送

Posted

技术标签:

【中文标题】Kotlin Coroutines 通道在 fixedRateTimer 内发送【英文标题】:Kotlin Coroutines channels send inside fixedRateTimer 【发布时间】:2020-10-24 06:40:03 【问题描述】:

我第一次使用 Kotlin Coroutines 从事一个爱好项目。我已经阅读并观看了有关它的视频,我有点了解这个概念。但我遇到了一个问题。让我给你看看我的代码。

package com.dev.tuber.ingestion.snapshots

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import org.joda.time.LocalTime
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.fixedRateTimer

object SnapshotsBuffer 
    private val buffer = ConcurrentHashMap<Int, MutableMap<Int, Queue<Snapshot>>>()

    init 
        for (minute in 0..59) 
            buffer[minute] = mutableMapOf()
        
    

    suspend fun start(snapshotsChannel: Channel<Snapshot>, composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) 
        startComposing(composeSnapshots)

        for (snapshot in snapshotsChannel) 
            val currentMinute = getCurrentMinute()
            if (!buffer[currentMinute]!!.containsKey(snapshot.pair.id)) 
                buffer[currentMinute]!![snapshot.pair.id] = LinkedList()
            

            buffer[currentMinute]!![snapshot.pair.id]!!.add(snapshot)
            println(buffer)
        
    

    private fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) 
        val oneMinute = (1000 * 60).toLong()

        fixedRateTimer("consuming", true, oneMinute, oneMinute) 
            val previousMinute = getPreviousMinute()
            composeSnapshots.send(buffer[previousMinute]!!) <---- cannot do this
            buffer[getPreviousMinute()] = mutableMapOf()
        
    

    private fun getCurrentMinute(): Int 
        return LocalTime().minuteOfHour
    

    private fun getPreviousMinute(): Int 
        val currentMinute = getCurrentMinute()

        if(currentMinute == 0) return 59
        return currentMinute - 1
    

所以。我有两个频道。第一个频道是snapshotsChannel,这是Snapshot 将到达的地方。我想缓冲Snapshot 并且每当一分钟过去我想将缓冲区发送到composeSnapshots 通道以进行进一步处理。

基本上我得到了很多Snapshot,我不想直接将它们发送给进一步处理。所以这就是为什么我想每对每分钟缓冲一次。

问题出现在startComposing 函数中。 fixedRateTimer 不是可挂起的函数,所以我不能在这里使用发送函数。我现在有点卡住了,因为我找不到解决方案。我研究了 TickerChannel 和 Kotlin Flow,但这似乎不是解决我问题的正确方法。

你知道解决办法吗?

【问题讨论】:

【参考方案1】:

您不能从非挂起函数调用挂起函数 (suspend fun Channel.send(element: E))。

在协程方式中,您可以有一个无限循环,该循环将自身暂停一分钟并重复发送到通道。很棒的是,延迟与取消合作。

private suspend fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) 
    val oneMinute = (1000 * 60).toLong()

    while(true) 
        delay(oneMinute)

        val previousMinute = getPreviousMinute()
        composeSnapshots.send(buffer[previousMinute]!!)
        buffer[previousMinute] = mutableMapOf()
    

【讨论】:

以上是关于Kotlin Coroutines 通道在 fixedRateTimer 内发送的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin Coroutines 协程实现原理全解析

如何使用 Kotlin Coroutines 在 Retrofit 中处理 204 响应?

Kotlin Coroutines 的现有 3 函数回调

Kotlin Coroutines 选择 Dispatcher

Kotlin Coroutines 中的 main-safe 是啥?

如何在 Spring 响应式 WebClient 中返回 Kotlin Coroutines Flow