对 clojure 异步通道的超时和缓冲区溢出执行任务
Posted
技术标签:
【中文标题】对 clojure 异步通道的超时和缓冲区溢出执行任务【英文标题】:Doing tasks on timeout and buffer overflow of clojure async channels 【发布时间】:2016-06-05 03:19:35 【问题描述】:我有一个缓冲通道,它正在缓存一些值(可以说它是一个包含许多值的列表)。现在,一旦缓冲区已满或发生超时,我必须完全读取(清空)通道。
完成此操作后,我希望通道再次开始缓存来自同一源的值,直到源为空。
我在网上看了很多资料,但概念还是有点混乱。
如何使用 clojure.core.async 做到这一点?
编辑
好的,所以我基本上设法编写了一些可以解决问题的代码。
(require '[clojure.core.async :as a :refer [>! <! >!! <!! go chan buffer close! thread alts! alts!! timeout offer! poll! buffer put!]])
(defn on-overflow-or-timeout [channel]
(do
(println "current used space: " (count (.buf channel)))
(if (> (count (.buf channel)) 0)
(let [loop-range (range (count (.buf channel)))]
(do
(println "retrieving values.....")
(dorun
(for [i loop-range]
(println "retrieved value: " (poll! channel))
))))
(println "No values in the channel. Please restart the process....")
)))
(defn process [channel buffer-size tout]
(let [tch (timeout tout)
check-chan (chan 2)]
(loop []
(let [value (read-string (read-line))]
(do
(println "Storing the value in actual channel: " value)
(offer! channel value)
(offer! check-chan value)
; Checking only till half its capacity
(if (>= (count (.buf channel)) (int (Math/ceil (/ buffer-size 2))))
(do
(println "overflowed.....")
(on-overflow-or-timeout channel)
(recur)
)
(let [[win-val win-chan] (alts!! [check-chan tch])]
(if (nil? win-val)
(do
(println "timed out.....")
(on-overflow-or-timeout channel)
(recur)
)
(do
(println "retrieved value from check-chan: " win-val)
(recur)
)))))))))
但我仍然觉得这段代码需要使用 GO 块或其他东西进行优化。谁能指出这段代码中的缺陷并朝着正确的方向调整它?
请注意,我将使用此代码来缓存 elasticsearch 查询和结果或类似的东西,并在超时或缓冲区已满时将它们存储在某处。
【问题讨论】:
向我们展示一些代码以及您遇到的问题? @glts 很抱歉,我什至无法开始编码,因为我对 core.async 感到非常困惑。我并不是要一个完整的解决方案。只是整个过程的一个小轮廓,甚至可能是某种伪代码。 @glts 我写了一些代码。看看就好,请指出缺陷。 【参考方案1】:频道具有.buf
字段这一事实是一个实现细节,您不应使用它。
不是 100% 了解您的要求,但在使用 core.async 时,您必须尝试从“线程”(go 块)的角度思考,只做一件事并通过通道与其他“线程”通信。
根据我从您的代码中收集到的信息,您似乎想要:
-
从
read-line
读取的执行块。会写信给陈
立即作用于新值的 go 块。会读到一个chan
执行缓冲的 go 块。会从一个chan读取并写入另一个
对缓冲区执行任何操作的 go 块。会读到一个chan
(2) 和 (3) 需要从 (1) 消费。使用mult 频道,这样他们都可以获得(1)所写内容的副本。 (4) 可以直接从 (3) 读取。
(3) 的可能实现如下所示:
(defn buffer-or-timeout [src out buffer-size tout]
(go-loop [[timeout buffer to-send]
[(a/timeout tout) [] nil]]
(when (seq to-send)
(println "flushing" to-send)
(a/>! out to-send))
(recur (a/alt!
src ([v] (if (= buffer-size (inc (count buffer)))
[(a/timeout tout) [] (conj buffer v)]
[timeout (conj buffer v) nil]))
timeout [(a/timeout tout) [] buffer]))))
【讨论】:
以上是关于对 clojure 异步通道的超时和缓冲区溢出执行任务的主要内容,如果未能解决你的问题,请参考以下文章