将“回复”队列中的消息复制到另一个队列

Posted

技术标签:

【中文标题】将“回复”队列中的消息复制到另一个队列【英文标题】:Duplicate messages from "reply-to" queues to another queue 【发布时间】:2020-01-29 19:28:50 【问题描述】:

我主要在 RPC 模式下使用 rabbitMq,但我也想将请求和响应消息复制到另一个队列。

最后,我想要实现的是,外部消费者可以通过侦听队列来查看所有流量,我们称之为“日志队列”。

复制传入的消息没问题,我只需要使用扇出交换器或将我的日志队列绑定到使用与 RPC 调用使用的路由密钥相同的交换器。

但我无法找到一种方法来“扇出”通过直接回复功能发送的消息。

到目前为止,我了解到响应消息是通过生成的 routing_key 以 amqp.rabbitmq.reply-to.generatedName 的形式发送到默认直接交换的,并且由于默认交换是不可触及的,因此我无法复制这些消息。

你知道有什么方法吗?

我有一个我宁愿避免的解决方案:让客户端将从回复接收到的响应重新发送到“日志队列”。

但这意味着我的客户负责这个“记录”功能,我宁愿不这样做。

顺便说一句,即使我认为它不相关,因为这可能是rabbitMq服务器配置问题,我使用Spring-AMQP客户端

【问题讨论】:

同样的问题,也用于记录目的。很遗憾,除了更改代码或通过设置中间交换或您的建议来影响性能之外,可能没有真正的选择...... 【参考方案1】:

你不能用固定的回复来做到这一点,因为没有真正的队列/交换。

但是,您可以将每个 RabbitTemplate 配置为使用一个固定的回复队列和一个回复容器来将来自该队列的回复定向到模板。

Documentation here.

此外,在使用该机制时,您可以将模板的replyAddress配置为交换和路由密钥的形式。

/**
 * An address for replies; if not provided, a temporary exclusive, auto-delete queue will
 * be used for each reply, unless RabbitMQ supports 'amq.rabbitmq.reply-to' - see
 * https://www.rabbitmq.com/direct-reply-to.html
 * <p>The address can be a simple queue name (in which case the reply will be routed via the default
 * exchange), or with the form @code exchange/routingKey to route the reply using an explicit
 * exchange and routing key.
 * @param replyAddress the replyAddress to set
 */
public synchronized void setReplyAddress(String replyAddress) ...

您只需照常设置模板和容器,使模板成为容器的侦听器...

@Bean
public RabbitTemplate amqpTemplate() 
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(msgConv());
    rabbitTemplate.setReplyAddress(replyQueue().getName());
    rabbitTemplate.setReplyTimeout(60000);
    rabbitTemplate.setUseDirectReplyToContainer(false);
    return rabbitTemplate;


@Bean
public SimpleMessageListenerContainer replyListenerContainer() 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(amqpTemplate());
    return container;

同样,这是来自文档。

【讨论】:

感谢您的回答。我已经看到了这种方式,并且正在寻找一个完整的rabbitmq服务器配置方式来解决我的问题。好吧,如果不可能,那就不是。感谢您的确认【参考方案2】:

我找到了实现我想要的方法。

这不像 Gary Russell 提议的那样灵活:https://***.com/a/59976806/2546702

但是,我可以利用 firehose 功能并在 amq.rabbitmq.trace 交换上绑定一个队列(或一个交换以获得更多控制权),并将 routingkey 设置为“发布”。 (最后的点很重要)

这允许记录发布到默认交换的消息,包括回复消息。

当然,使用 firehose 会对性能产生影响,但就我而言,这并不是什么大问题,因为 rabbitmq 没有得到充分利用。

由于我有 16 个队列要监听,我宁愿不要为每个队列使用不同的模板。我可以为所有 RPC 队列使用一个回复队列,但这会是一个瓶颈。

所以,如果没有硬 NOGO,firehose 似乎是一个不错的选择。

【讨论】:

以上是关于将“回复”队列中的消息复制到另一个队列的主要内容,如果未能解决你的问题,请参考以下文章

如何将消息复制到 RabbitMQ 上的另一个队列?

表复制到其他数据库的高级队列

一条 JMS 消息复制到两个队列

分布式消息队列之kafka

如何将 Azure 存储帐户内容(表、队列、blob)复制到其他存储帐户

使用 Powershell 将 MSMQ 消息从一个队列移动到另一个队列