对 clojure 异步通道的超时和缓冲区溢出执行任务

Posted

技术标签:

【中文标题】对 clojure 异步通道的超时和缓冲区溢出执行任务【英文标题】:Doing tasks on timeout and buffer overflow of clojure async channels 【发布时间】:2016-06-05 03:19:35 【问题描述】:

我有一个缓冲通道,它正在缓存一些值(可以说它是一个包含许多值的列表)。现在,一旦缓冲区已满或发生超时,我必须完全读取(清空)通道。

完成此操作后,我希望通道再次开始缓存来自同一源的值,直到源为空。

我在网上看了很多资料,但概念还是有点混乱。

如何使用 clojure.core.async 做到这一点?

编辑

好的,所以我基本上设法编写了一些可以解决问题的代码。

(require '[clojure.core.async :as a :refer [>! <! >!! <!! go chan buffer close! thread alts! alts!! timeout offer! poll! buffer put!]])

(defn on-overflow-or-timeout [channel]
  (do
    (println "current used space: " (count (.buf channel)))
      (if (> (count (.buf channel)) 0)
      (let [loop-range (range (count (.buf channel)))]
        (do
          (println "retrieving values.....")
          (dorun
          (for [i loop-range]
            (println "retrieved value: " (poll! channel))
            ))))
      (println "No values in the channel. Please restart the process....")
      )))


(defn process [channel buffer-size tout]
  (let [tch (timeout tout)
        check-chan (chan 2)]
    (loop []
      (let [value (read-string (read-line))]
        (do
                (println "Storing the value in actual channel: " value)
                (offer! channel value)
          (offer! check-chan value)
          ; Checking only till half its capacity
          (if (>= (count (.buf channel)) (int (Math/ceil (/ buffer-size 2))))
            (do
              (println "overflowed.....")
              (on-overflow-or-timeout channel)
              (recur)
              )
            (let [[win-val win-chan] (alts!! [check-chan tch])]
              (if (nil? win-val)
                (do
                  (println "timed out.....")
                  (on-overflow-or-timeout channel)
                  (recur)
                  )
                (do
                  (println "retrieved value from check-chan: " win-val)
                  (recur)
                  )))))))))

但我仍然觉得这段代码需要使用 GO 块或其他东西进行优化。谁能指出这段代码中的缺陷并朝着正确的方向调整它?

请注意,我将使用此代码来缓存 elasticsearch 查询和结果或类似的东西,并在超时或缓冲区已满时将它们存储在某处。

【问题讨论】:

向我们展示一些代码以及您遇到的问题? @glts 很抱歉,我什至无法开始编码,因为我对 core.async 感到非常困惑。我并不是要一个完整的解决方案。只是整个过程的一个小轮廓,甚至可能是某种伪代码。 @glts 我写了一些代码。看看就好,请指出缺陷。 【参考方案1】:

频道具有.buf 字段这一事实是一个实现细节,您不应使用它。

不是 100% 了解您的要求,但在使用 core.async 时,您必须尝试从“线程”(go 块)的角度思考,只做一件事并通过通道与其他“线程”通信。

根据我从您的代码中收集到的信息,您似乎想要:

    read-line 读取的执行块。会写信给陈 立即作用于新值的 go 块。会读到一个chan 执行缓冲的 go 块。会从一个chan读取并写入另一个 对缓冲区执行任何操作的 go 块。会读到一个chan

(2) 和 (3) 需要从 (1) 消费。使用mult 频道,这样他们都可以获得(1)所写内容的副本。 (4) 可以直接从 (3) 读取。

(3) 的可能实现如下所示:

(defn buffer-or-timeout [src out buffer-size tout]
  (go-loop [[timeout          buffer to-send]
            [(a/timeout tout) []     nil]]
    (when (seq to-send)
      (println "flushing" to-send)
      (a/>! out to-send))
    (recur (a/alt!
             src ([v] (if (= buffer-size (inc (count buffer)))
                        [(a/timeout tout) [] (conj buffer v)]
                        [timeout (conj buffer v) nil]))
             timeout [(a/timeout tout) [] buffer]))))

【讨论】:

以上是关于对 clojure 异步通道的超时和缓冲区溢出执行任务的主要内容,如果未能解决你的问题,请参考以下文章

Clojure 中的速率限制 core.async 通道

使用缓冲区溢出覆盖位于缓冲区下方的局部变量

Boost Beast websocket 服务器异步接受失败,缓冲区溢出

java nio

java NIO

Java NIO 学习