RabbitMQ 和通道 Java 线程安全

Posted

技术标签:

【中文标题】RabbitMQ 和通道 Java 线程安全【英文标题】:RabbitMQ and channels Java thread safety 【发布时间】:2015-08-22 02:33:40 【问题描述】:

在本指南中https://www.rabbitmq.com/api-guide.html RabbitMQ 伙计们表示:

通道和并发注意事项(线程安全)

通道实例不能在线程之间共享。应用程序应该更喜欢每个线程使用一个 Channel,而不是在多个线程之间共享同一个 Channel。虽然通道上的某些操作可以安全地并发调用,但有些则不能,并且会导致线路上的帧交错不正确。在线程之间共享频道也会干扰* Publisher Confirms。

线程安全非常重要,所以我尽可能地勤奋,但问题是:

我有这个从 Rabbit 接收消息的应用程序。收到消息后,它会对其进行处理,然后在完成后确认。应用程序只能在具有 2 个线程的固定线程池中同时处理 2 个项目。 Rabbit 的 QOS 预取设置为 2,因为我不想为应用提供超过它在一段时间内可以处理的内容。

现在,我的消费者的 handleDelivery 执行以下操作:

Task run = new Task(JSON.parse(message));    
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));

此时,您已经发现 TestWrapperThread 将 channel.basicAck(deliveryTag, false); 调用作为最后一个操作。

根据我对文档的理解,这是不正确的并且可能有害,因为通道不是线程安全的,并且这种行为可能会搞砸。但是我应该怎么做呢?我的意思是,我有一些想法,但它们会使一切变得更加复杂,我想弄清楚它是否真的有必要。

提前致谢

【问题讨论】:

如果我理解,你的问题是关于多线程 channel.basicAck(deliveryTag, false);对吗? 正确。这是我唯一的担心 【参考方案1】:

我想您仅将Channel 用于您的消费者,而不用于发布等其他操作。

就你而言,唯一的潜在问题在这里:

channel.basicAck(deliveryTag, false);

因为你跨两个线程调用这个,顺便说一句,这个操作是安全的,如果你看到 java 代码:

ChannelN.java 类调用:

public void basicAck(long deliveryTag, boolean multiple)
   throws IOException

   transmit(new Basic.Ack(deliveryTag, multiple));

see github code for ChannelN.java

AMQChannel 内部的transmit 方法使用:

public void transmit(Method m) throws IOException 
   synchronized (_channelMutex) 
       transmit(new AMQCommand(m));
   

_channelMutexprotected final Object _channelMutex = new Object();

用类创建。 see github code for AMQChannel.java

编辑

正如您在官方文档中看到的那样,“一些”操作是线程安全的,现在还不清楚哪些操作。 我研究了代码,我认为跨多个线程调用 ACK 没有问题。

希望对您有所帮助。

EDIT2 我还添加了 Nicolas 的评论:

请注意,从多个线程消费 (basicConsume) 和确认是 Java 客户端已经使用的常见 rabbitmq 模式。

这样你就可以放心使用了。

【讨论】:

只有消费者。所以你基本上确认它不应该是一个问题?您是否认为文档实际上意味着“在混杂的接收/发送情况下保持线程安全”? 谢谢加斯。我想我有一个理论,为什么他们基本上说你不应该多线程一个通道,也明确提到了确认操作。我认为问题在于,如果您在各种线程中大量使用通道,在大量数据的发送操作中,您基本上会阻塞通道,直到它们完成。现在,如果您还需要发回确认,它们可能会延迟很多,可能会让您的系统在实际上不需要时触发“恢复”过程。 请注意,从多个线程消费 (basicConsume) 和 acking 是 java 客户端已经使用的常见 rabbitmq 模式。 @SimonePezzano 更多的是关于incorrect frame interleaving on the wire. 而不是阻塞。 感谢大家的大力帮助。在这一点上,我所要做的就是试图弄清楚 RabbitMQ 上下文中的帧交错实际上是什么。我发现了。每次您发布非空消息时,您的客户端都会在线发送 3 个(或更多)帧:[method: basic.publish] [content headers] [content body]+ 从多个线程在同一通道上发布时,您最终可能会出现不正确的交错,例如[method: basic.publish] [content headers] [method: basic.publish] [content body] [content headers] [content body] 一切都清楚了,谢谢!

以上是关于RabbitMQ 和通道 Java 线程安全的主要内容,如果未能解决你的问题,请参考以下文章

gRPC 中的通道/存根是线程安全的吗

使用多线程和通道发布时,线程都被阻塞,rabbitmq

RabbitMQ - 关闭空闲/悬空通道

什么是线程安全,实现线程安全都有哪些方法

将缓冲区写入 Java 通道:线程安全与否?

不会吧,你连Java 多线程线程安全都还没搞明白,难怪你面试总不过