Clojure 中的速率限制 core.async 通道

Posted

技术标签:

【中文标题】Clojure 中的速率限制 core.async 通道【英文标题】:Rate limiting core.async channels in Clojure 【发布时间】:2014-03-20 23:56:44 【问题描述】:

我正在将 Clojure 与 core.async 一起使用,并且有一种情况,我想对通过通道处理的消息数量设置速率限制。

我特别想:

定义速率限制,例如每秒 1000 条消息 只要消息数量低于速率限制,就可以正常(并及时)处理消息 如果超出速率限制,对事件进行某种合理的替代处理(例如,告诉客户端稍后再试) 开销相当低

实现这一目标的最佳方法是什么?

【问题讨论】:

【参考方案1】:

这是一种使用原子来计算正在发送的消息数量并定期将其重置为零的方法:

(def counter (atom 0))

(def time-period 1000) ;milliseconds

(def max-rate 1000) ;max number of messages per time-period

(def ch (chan))

(defn alert-client []
  (println "That's enough!"))

(go (while true (<! (timeout time-period)) (reset! counter 0))) ; reset counter periodically 

(defn process [msg]
  (if (> (swap! counter inc) max-rate) (alert-client) (put! ch msg)))

(doseq [x (range 1001)] (process x)) ; throw some messages at the channel

您需要更多代码才能使用来自频道的消息。如果您不确定是否能够以限制消息的速率持续使用消息,您可能需要指定通道缓冲区大小或通道类型(下降/滑动)。

【讨论】:

【参考方案2】:

您正在寻找的东西被称为断路器。我认为***页面的描述相当糟糕:

http://en.wikipedia.org/wiki/Circuit_breaker_design_pattern

不过,我们的 Scala 朋友做得非常棒:

http://doc.akka.io/docs/akka/2.2.3/common/circuitbreaker.html

还有一个 clojure 库,但您必须自己与 core.async 进行集成:

https://github.com/krukow/clojure-circuit-breaker

https://github.com/josephwilk/circuit-breaker

一篇关于断路器和使用 clojure 进行缩放的博文:

http://blog.josephwilk.net/clojure/building-clojure-services-at-scale.html

看起来您可能需要考虑提供 clojure 绑定的 netflix Hystrix 之类的东西:

https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-clj

HTH

【讨论】:

【参考方案3】:

问题分解:

    定义速率限制,例如每秒 1000 条消息 只要消息数量较少,就可以正常(并及时)处理消息 超过速率限制 如果超出速率限制(例如告诉客户 请稍后再试) 开销相当低

我正在通过一个简单地将通道组合成循环的解决方案来解决这个问题。

一种常见的速率限制算法称为Token bucket。您有一个固定大小的令牌桶,并以固定的速率添加令牌。只要有令牌,就可以发送消息。

桶的大小决定了“突发性”(你能多快赶上最大速率),而速率决定了最大平均速率。这些将是我们代码的参数。

让我们创建一个以给定速率发送消息(不管是什么)的通道。 (#1)

(defn rate-chan [burstiness rate]
  (let [c (chan burstiness) ;; bucket size is buffer size
        delta (/ 1000 rate)]
    (go
      (while true
        (>! c :go) ;; send a token, will block if bucket is full
        (<! (timeout delta)))) ;; wait a little
    c))

现在我们需要一个按速率限制另一个通道的通道。 (#2)

(defn limit-chan [in rc]
  (let [c (chan)]
    (go 
      (while true
        (<! rc) ;; wait for token
        (>! c (<! in)))) ;; pass message along
    c))

现在我们可以在没有消息等待的情况下使用这些默认通道:

(defn chan-with-default [in]
  (let [c (chan)]
    (go
      (while true
        ;; take from in, or if not available, pass useful message
        (>! c (alts! [in] :default :rate-exceeded))))
    c))

现在我们有了解决问题的所有方法。

(def rchan (-> (chan)
               (limit-chan (rate-chan 100 1000))
               (chan-with-default)))

就#4 而言,这不是绝对最快的解决方案。但它使用可组合部件,并且可能足够快。如果您希望它更快,您可以创建一个循环来完成所有这些(而不是将其分解为更小的函数)。最快的方法是自己实现interfaces。

【讨论】:

如果rchan 中没有任何内容,它不会继续发出:rate-exceeded 吗?另外,关于背压的要求#3 不是让生产者知道限制,而不是让通道消费者知道吗?【参考方案4】:

我写了a little library 来解决这个问题。它的实现与 Eric Normand 的非常相似,但对高吞吐量通道采取了一些措施(对于接近毫秒的睡眠时间,超时并不精确)。

还支持对一组通道进行全局限流,功能限流。

查看here。

【讨论】:

以上是关于Clojure 中的速率限制 core.async 通道的主要内容,如果未能解决你的问题,请参考以下文章

是否应该耗尽 clojure.core.async 通道以释放停放的 put

带有外部绑定符号的 core.async go 块可以工作,但不能进行宏扩展

我可以使用 http-kit 和 core.async 制作一个完全非阻塞的后端应用程序吗?

Clojure基础课程2-Clojure中的数据长啥样?

Clojure基础课程2-Clojure中的数据长啥样?

安全规则中的 Firebase 速率限制?