是否应该耗尽 clojure.core.async 通道以释放停放的 put

Posted

技术标签:

【中文标题】是否应该耗尽 clojure.core.async 通道以释放停放的 put【英文标题】:Should clojure.core.async channel be drained to release parked puts 【发布时间】:2018-01-11 09:24:59 【问题描述】:

问题:我有消费者从中读取的通道,并且在获得足够数据时可能会停止读取。当阅读器停止时,它会使用 clojure.core.async/close 关闭频道!

文档说此时所有对通道的puts close 被调用后应该返回false 并且什么都不做。但是文档也说

在所有看跌期权都已交付后发生逻辑关闭。因此,任何受阻或停放的看跌期权将保持受阻/停放,直到接受者释放它们。

这是否意味着要释放在关闭通道时已经被阻塞的生产者,我应该总是在消费者端排空通道(读取所有剩余的项目)?以下代码显示 go 块永远不会结束:

(require '[clojure.core.async :as a])
(let [c (a/chan)]
  (a/go
    (prn "Go")
    (prn "Put" (a/>! c 333)))
  (Thread/sleep 300) ;; Let go block to be scheduled
  (a/close! c))

如果这是真的,并且我不想阅读所有事件,那么我应该实现例如生产者端超时以检测不需要更多数据?

有没有更简单的方法让消费者告诉“足够”来推动生产者也优雅地停止?

我发现clojure.core.async/put! 不会阻塞并允许避免不必要的阻塞。用它代替clojure.core.aasync/>!有什么缺点吗?

【问题讨论】:

我一直更喜欢put!的非阻塞形式。我定义了一个别名tupelo.async/put-later!(仿照SwingUtilities.invokeLater()),我觉得它更容易记住。 令我惊讶的是,默认的(c.c.a/chan) 缓冲区实际上是无限的(无论您指定哪个缓冲区大小),所以实际上它永远不会溢出。依靠这个 bug 或者提供适当的自定义缓冲区,put! 可以用作非阻塞。 【参考方案1】:

关闭频道会释放所有正在阅读它们的人,并阻止作者

这是阅读案例(效果很好):

user> (def a-chan (async/chan))
#'user/a-chan
user> (future (async/<!! a-chan)
              (println "continuting after take"))
#future[:status :pending, :val nil 0x5fb5a025]
user> (async/close! a-chan)
nil
user> continuting after take

这是一个写作案例的测试,正如你所说,排干它可能是一个好主意:

user> (def b-chan (async/chan))
#'user/b-chan
user> (future (try (async/>!! b-chan 4)
                   (println "continuting after put")
                   (catch Exception e
                     (println "got exception" e))
                   (finally
                     (println "finished in finally"))))
#future[:status :pending, :val nil 0x17be0f7b]
user> (async/close! b-chan)
nil

我没有找到任何证据表明当 chan 关闭时卡住的作者在这里解除阻塞

【讨论】:

是的,您描述了有问题的问题。关闭频道应该解除阻止作者,即使他们目前处于停放状态,所以这是clojure.core.async 中的一个错误。问题是如何从消费者方面通知作者(最好没有反向渠道)。或者至少可靠地避免在写入端阻塞。【参考方案2】:

这种行为是有意的,因为他们在文档中明确声明了它!

在您的情况下,在关闭通道c 后执行(while (async/poll! c)) 以释放所有阻塞/停放(消息发送)线程/go-blocks。

如果你想对内容做任何事情,你可以做:

(->> (repeatedly #(async/poll! c))
     (take-while identity))

【讨论】:

我希望有更好的东西 :) 消费者可能会死于例如异常,不再接受输入。

以上是关于是否应该耗尽 clojure.core.async 通道以释放停放的 put的主要内容,如果未能解决你的问题,请参考以下文章

Android 调试 -> 电池耗尽

是否可以通过向 localhost 快速发送大量 HTTP 请求来耗尽您的动态端口范围?

使用 gps 捕获经纬度而不会耗尽电池电量

Laravel CSV 导入内存不足(允许内存耗尽)

未捕获的错误:返回的值无效,是不是耗尽了 Gas?

调试桌面堆耗尽