是否应该耗尽 clojure.core.async 通道以释放停放的 put
Posted
技术标签:
【中文标题】是否应该耗尽 clojure.core.async 通道以释放停放的 put【英文标题】:Should clojure.core.async channel be drained to release parked puts 【发布时间】:2018-01-11 09:24:59 【问题描述】:问题:我有消费者从中读取的通道,并且在获得足够数据时可能会停止读取。当阅读器停止时,它会使用 clojure.core.async/close 关闭频道!
文档说此时所有对通道的puts 在 close 被调用后应该返回false 并且什么都不做。但是文档也说
在所有看跌期权都已交付后发生逻辑关闭。因此,任何受阻或停放的看跌期权将保持受阻/停放,直到接受者释放它们。
这是否意味着要释放在关闭通道时已经被阻塞的生产者,我应该总是在消费者端排空通道(读取所有剩余的项目)?以下代码显示 go 块永远不会结束:
(require '[clojure.core.async :as a])
(let [c (a/chan)]
(a/go
(prn "Go")
(prn "Put" (a/>! c 333)))
(Thread/sleep 300) ;; Let go block to be scheduled
(a/close! c))
如果这是真的,并且我不想阅读所有事件,那么我应该实现例如生产者端超时以检测不需要更多数据?
有没有更简单的方法让消费者告诉“足够”来推动生产者也优雅地停止?
我发现clojure.core.async/put!
不会阻塞并允许避免不必要的阻塞。用它代替clojure.core.aasync/>!
有什么缺点吗?
【问题讨论】:
我一直更喜欢put!
的非阻塞形式。我定义了一个别名tupelo.async/put-later!
(仿照SwingUtilities.invokeLater()
),我觉得它更容易记住。
令我惊讶的是,默认的(c.c.a/chan)
缓冲区实际上是无限的(无论您指定哪个缓冲区大小),所以实际上它永远不会溢出。依靠这个 bug 或者提供适当的自定义缓冲区,put!
可以用作非阻塞。
【参考方案1】:
关闭频道会释放所有正在阅读它们的人,并阻止作者
这是阅读案例(效果很好):
user> (def a-chan (async/chan))
#'user/a-chan
user> (future (async/<!! a-chan)
(println "continuting after take"))
#future[:status :pending, :val nil 0x5fb5a025]
user> (async/close! a-chan)
nil
user> continuting after take
这是一个写作案例的测试,正如你所说,排干它可能是一个好主意:
user> (def b-chan (async/chan))
#'user/b-chan
user> (future (try (async/>!! b-chan 4)
(println "continuting after put")
(catch Exception e
(println "got exception" e))
(finally
(println "finished in finally"))))
#future[:status :pending, :val nil 0x17be0f7b]
user> (async/close! b-chan)
nil
我没有找到任何证据表明当 chan 关闭时卡住的作者在这里解除阻塞
【讨论】:
是的,您描述了有问题的问题。关闭频道应该解除阻止作者,即使他们目前处于停放状态,所以这是clojure.core.async
中的一个错误。问题是如何从消费者方面通知作者(最好没有反向渠道)。或者至少可靠地避免在写入端阻塞。【参考方案2】:
这种行为是有意的,因为他们在文档中明确声明了它!
在您的情况下,在关闭通道c
后执行(while (async/poll! c))
以释放所有阻塞/停放(消息发送)线程/go-blocks。
如果你想对内容做任何事情,你可以做:
(->> (repeatedly #(async/poll! c))
(take-while identity))
【讨论】:
我希望有更好的东西 :) 消费者可能会死于例如异常,不再接受输入。以上是关于是否应该耗尽 clojure.core.async 通道以释放停放的 put的主要内容,如果未能解决你的问题,请参考以下文章