RabbitMQ:客户端未收到客户端启动前发送的消息

Posted

技术标签:

【中文标题】RabbitMQ:客户端未收到客户端启动前发送的消息【英文标题】:RabbitMQ: Client does not receive messages sent before client boots 【发布时间】:2019-12-02 00:00:06 【问题描述】:

我将 RabbitMQ 与 Langohr(clojure 客户端)一起使用。如果消费者已经在运行,我可以很好地接收消息,但如果我在客户端启动之前发送消息,客户端永远不会收到这些消息。

是否有一个配置选项(或服务器和客户端的配置选项群)确保客户端可以接收在运行之前生成的消息?

如何开始尝试测试或调试?

我正在使用默认交换。这是我用来设置客户端的代码:

(ns sisyphus.rabbit
  (:require
   [cheshire.core :as json]
   [langohr.core :as lcore]
   [langohr.channel :as lchannel]
   [langohr.exchange :as lexchange]
   [langohr.queue :as lqueue]
   [langohr.consumers :as lconsumers]
   [langohr.basic :as lbasic]
   [sisyphus.log :as log]))

(defn connect!
  [config]
  (let [connection (lcore/connect )
        channel (lchannel/open connection)
        _ (lbasic/qos channel 1)
        queue-name (get config :queue "sisyphus")
        exchange (get config :exchange "")
        queue (lqueue/declare channel "sisyphus" :exclusive false :durable true)
        routing-key (get config :routing-key "sisyphus")]
    (if-not (= exchange "")
      (lqueue/bind channel queue-name exchange :routing-key routing-key))
    :queue queue
     :queue-name queue-name
     :exchange exchange
     :routing-key routing-key
     :connection connection
     :channel channel
     :config config)

然后发布:

(defn publish!
  [rabbit message]
  (lbasic/publish
   (:channel rabbit)
   (:exchange rabbit)
   (:routing-key rabbit)
   (json/generate-string message)
   :content-type "text/plain"
    :peristent true))

谢谢!

【问题讨论】:

我看到消费者声明了一个持久队列并将其绑定到交换器。即使消费者离开,这部分拓扑也应该持续存在。是这样吗?如果是,则路由到此队列的消息应保留在那里,直到消费者开始消费。会发生这种情况吗? 【参考方案1】:

看起来您可以设置消息和队列的生存时间。因此,当没有人连接到队列时,队列会一直存在,因此消息会保留一段时间而不会被宣布为“死亡”

http://clojurerabbitmq.info/articles/extensions.html#per-queue-message-time-to-live

例子

以下示例将新的服务器命名队列的消息 TTL 设置为 500 毫秒。然后它会发布一条路由到队列的消息,并在等待 600 毫秒后统计队列中的消息:

(ns clojurewerkz.langohr.examples.per-queue-message-ttl
  (:gen-class)
  (:require [langohr.core    :as rmq]
            [langohr.channel :as lch]
            [langohr.queue   :as lq]
            [langohr.basic   :as lb]))

(def ^:const true
  default-exchange-name "")

(defn -main
  [& args]
  (let [conn  (rmq/connect)
        ch    (lch/open conn)
        qname "clojurewerkz.langohr.examples.per-queue-message-ttl"]
    (lq/declare ch qname :arguments "x-message-ttl" 500 :durable false)
    (lb/publish ch default-exchange-name qname "a message")
    (Thread/sleep 50)
    (println (format "Queue %s has %d messages" qname (lq/message-count ch qname)))
    (println "Waiting for 600 ms")
    (Thread/sleep 600)
    (println (format "Queue %s has %d messages" qname (lq/message-count ch qname)))
    (println "[main] Disconnecting...")
    (rmq/close ch)
    (rmq/close conn)))

【讨论】:

嗯,基于这个答案:***.com/questions/24946181/… 看起来默认 TTL 是无穷大,所以我看不出这有什么帮助? 是消息存活的时间还是队列继续存在的时间?您能否确认 TTL 值符合预期?【参考方案2】:

事实证明,您需要设置 :auto-delete false 才能使持久队列运行良好。你知道的越多!

【讨论】:

以上是关于RabbitMQ:客户端未收到客户端启动前发送的消息的主要内容,如果未能解决你的问题,请参考以下文章

第五章:Python 之 RabbitMQ消息分发轮询

rabbitmq能处理多大的数据

RabbitMQ exchange route queue原理

RabbitMQ exchange route queue原理

在Python中使用select()方法进行客户端/服务器聊天

Javascript - 当服务器发送大响应时,WebSocket 客户端未收到 onmessage 事件