RabbitMQ 和通道 Java 线程安全
Posted
技术标签:
【中文标题】RabbitMQ 和通道 Java 线程安全【英文标题】:RabbitMQ and channels Java thread safety 【发布时间】:2015-08-22 02:33:40 【问题描述】:在本指南中https://www.rabbitmq.com/api-guide.html RabbitMQ 伙计们表示:
通道和并发注意事项(线程安全)
通道实例不能在线程之间共享。应用程序应该更喜欢每个线程使用一个 Channel,而不是在多个线程之间共享同一个 Channel。虽然通道上的某些操作可以安全地并发调用,但有些则不能,并且会导致线路上的帧交错不正确。在线程之间共享频道也会干扰* Publisher Confirms。
线程安全非常重要,所以我尽可能地勤奋,但问题是:
我有这个从 Rabbit 接收消息的应用程序。收到消息后,它会对其进行处理,然后在完成后确认。应用程序只能在具有 2 个线程的固定线程池中同时处理 2 个项目。 Rabbit 的 QOS 预取设置为 2,因为我不想为应用提供超过它在一段时间内可以处理的内容。
现在,我的消费者的 handleDelivery 执行以下操作:
Task run = new Task(JSON.parse(message));
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));
此时,您已经发现 TestWrapperThread 将 channel.basicAck(deliveryTag, false);
调用作为最后一个操作。
根据我对文档的理解,这是不正确的并且可能有害,因为通道不是线程安全的,并且这种行为可能会搞砸。但是我应该怎么做呢?我的意思是,我有一些想法,但它们会使一切变得更加复杂,我想弄清楚它是否真的有必要。
提前致谢
【问题讨论】:
如果我理解,你的问题是关于多线程 channel.basicAck(deliveryTag, false);对吗? 正确。这是我唯一的担心 【参考方案1】:我想您仅将Channel
用于您的消费者,而不用于发布等其他操作。
就你而言,唯一的潜在问题在这里:
channel.basicAck(deliveryTag, false);
因为你跨两个线程调用这个,顺便说一句,这个操作是安全的,如果你看到 java 代码:
ChannelN.java
类调用:
public void basicAck(long deliveryTag, boolean multiple)
throws IOException
transmit(new Basic.Ack(deliveryTag, multiple));
see github code for ChannelN.java
AMQChannel 内部的transmit
方法使用:
public void transmit(Method m) throws IOException
synchronized (_channelMutex)
transmit(new AMQCommand(m));
_channelMutex
是protected final Object _channelMutex = new Object();
用类创建。 see github code for AMQChannel.java
编辑
正如您在官方文档中看到的那样,“一些”操作是线程安全的,现在还不清楚哪些操作。 我研究了代码,我认为跨多个线程调用 ACK 没有问题。
希望对您有所帮助。
EDIT2 我还添加了 Nicolas 的评论:
请注意,从多个线程消费 (basicConsume) 和确认是 Java 客户端已经使用的常见 rabbitmq 模式。
这样你就可以放心使用了。
【讨论】:
只有消费者。所以你基本上确认它不应该是一个问题?您是否认为文档实际上意味着“在混杂的接收/发送情况下保持线程安全”? 谢谢加斯。我想我有一个理论,为什么他们基本上说你不应该多线程一个通道,也明确提到了确认操作。我认为问题在于,如果您在各种线程中大量使用通道,在大量数据的发送操作中,您基本上会阻塞通道,直到它们完成。现在,如果您还需要发回确认,它们可能会延迟很多,可能会让您的系统在实际上不需要时触发“恢复”过程。 请注意,从多个线程消费 (basicConsume) 和 acking 是 java 客户端已经使用的常见 rabbitmq 模式。 @SimonePezzano 更多的是关于incorrect frame interleaving on the wire.
而不是阻塞。
感谢大家的大力帮助。在这一点上,我所要做的就是试图弄清楚 RabbitMQ 上下文中的帧交错实际上是什么。我发现了。每次您发布非空消息时,您的客户端都会在线发送 3 个(或更多)帧:[method: basic.publish] [content headers] [content body]+
从多个线程在同一通道上发布时,您最终可能会出现不正确的交错,例如[method: basic.publish] [content headers] [method: basic.publish] [content body] [content headers] [content body]
一切都清楚了,谢谢!以上是关于RabbitMQ 和通道 Java 线程安全的主要内容,如果未能解决你的问题,请参考以下文章