向多个 RabbitMQ 消费者发布消息

Posted

技术标签:

【中文标题】向多个 RabbitMQ 消费者发布消息【英文标题】:Publishing a message to multiple RabbitMQ consumers 【发布时间】:2015-08-17 22:22:50 【问题描述】:

我一直在构建一个使用 RabbitMQ 工作的 webapp。有 3-5 个 worker,处理来自一个队列的数据,偶尔一个 worker 需要向所有 worker 发送消息。

我一直在阅读 RabbitMQ 文档,但很难找到有用的东西。

当一个新的worker启动时,它会生成一个UUID(v6),然后订阅队列worker.id,例如worker.7a277e65-8df4-4670-99b4-52c13478831d

从其他问题中,我已经能够确定我需要创建一个主题交换以允许工作人员将消息发布到worker.*,以便将其发送给每个工作人员。

但是,我不确定应该如何执行此操作。我正在使用jackrabbit,它不支持以编程方式创建交换,所以我需要通过 RabbitMQ 管理控制台来完成,如下所示:

您能否就我应该创建的交换类型和参数提供建议,如果不能像通常的队列那样处理,也许还可以提供一个使用示例?

【问题讨论】:

【参考方案1】:

如果 Jackrabbit 不允许您指定交换、队列或绑定,那么您需要找到另一个库。

问题将出在您的绑定上。如您所见,您可以从管理 UI 创建交换、队列和绑定。但是随着 Jackrabbit 创建特定于工作人员的队列,您将无法在您的交换和 Jackrabbit 创建的队列之间创建绑定。

我使用并强烈推荐 wascally 库。它建立在 amqplib 之上,与 Jackrabbit 相同,但提供了更多功能丰富的 API 和功能。

作为如何使用 Wascally 的示例...使用此代码设置三个文件:

config.json



  "connection": 
    "user": "test",
    "pass": "password",
    "server": "localhost",
    "vhost": "test-app"
  ,

  "exchanges":[
    "name": "sample-ex.1", "type": "direct"
  ],

  "queues":[
    "name": "sample-q.1"
  ],

  "bindings":[
    "exchange": "sample-ex.1", "target": "sample-q.1"
  ]

receiver.js


var rabbit = require("wascally");
var config = require("config.json");

rabbit.configure(config)
  .then(function()
    rabbit.handle("test.message.type", handleMessage);
    rabbit.startSubscription("sample-q.1");
  )
  .then(undefined, reportError);

function reportError(err)
  console.log(err.stack);
  process.exit();


function handleMessage(msg)
  var body = msg.body;

  console.log("received a message");
  console.log(body);
  msg.ack();

  setTimeout(function()
    // do work
  , 5000);

sender.js


var rabbit = require("wascally");
var config = require("config.json");

rabbit.configure(config)
  .then(function()
    rabbit.publish("sample-ex.1", 
      type: "test.message.type",
      routingKey: "",
      body: 
        foo: "bar"
      
    );
  )
  .then(undefined, reportError);

function reportError(err)
  console.log(err.stack);
  process.exit();

现在您可以根据需要创建任意数量的接收器实例。只需运行node receiver.js 几次。

然后运行node sender.js 几次,你会看到消息通过接收者传来 - 一个接收者得到一条消息,rabbitmq 会将消息轮询到接收者。


附言如果您有兴趣,此代码来自我的RabbitMQ For Devs 包中的截屏视频。

【讨论】:

以上是关于向多个 RabbitMQ 消费者发布消息的主要内容,如果未能解决你的问题,请参考以下文章

用于 FanOut 消息传递的 RabbitMQ 工作队列

5.RabbitMQ 客户端控制消息

RabbitMQ / AMQP:单个队列,同一消息的多个消费者?

千万PV----RabbitMQ集群配置

RabbitMQ 跨多个队列的多个消费者 - 消息延迟处理

RabbitMQ发布订阅模式