使用多线程和通道发布时,线程都被阻塞,rabbitmq

Posted

技术标签:

【中文标题】使用多线程和通道发布时,线程都被阻塞,rabbitmq【英文标题】:Threads are all blocked when publish with multi threads and channels , rabbitmq 【发布时间】:2018-07-21 13:46:33 【问题描述】:

我在我的产品环境中发现了一个问题。

我们在一个 mq 集群中有 6 个队列,我们​​有 200 个线程的线程池(实际上会更多,因为它会在独立的线程池中调度一些特殊任务)来处理来自上游的请求,在处理请求时,我将向 rabbitmq 代理发布消息。

所以我有 200 个线程将消息发布到这 6 个队列。

对于每个队列,我将创建一个 AMQP 连接,并且对于每个线程,我都有一个 Channel 的 threadlocal,这样每个线程都可以拥有自己的通道而无需同步,因为通道不是线程安全的。

所以,实际上,我将开放 1200 个频道。 requests qps在4000/s左右,特殊时期会大一些。

但是我发现200个线程都用完了,而且大部分都处于阻塞状态,比如:

    DubboServerHandler-10.12.26.124:9000-thread-200 - priority:10 - threadId:0x00007f6708030800 - nativeId:0x680d - state:BLOCKED
    stackTrace:
    java.lang.Thread.State: BLOCKED (on object monitor)
    at com.rabbitmq.client.impl.SocketFrameHandler.writeFrame(SocketFrameHandler.java:170)
    - waiting to lock <0x0000000738ad0190> (a java.io.DataOutputStream)
    at com.rabbitmq.client.impl.AMQConnection.writeFrame(AMQConnection.java:542)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:104)
    - locked <0x000000074e085338> (a com.rabbitmq.client.impl.CommandAssembler)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:337)
    - locked <0x000000074656eeb0> (a java.lang.Object)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:313)
    - locked <0x000000074656eeb0> (a java.lang.Object)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:686)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:668)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:658)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:192)

这是我的 jstack 报告:http://fastthread.io/my-thread-report.jsp?p=c2hhcmVkLzIwMTgvMDIvMTEvLS0yNjE3OS50eHQtLTMtNTMtMzg=

我的问题是:

  1.Why I have different channels to publish but they are all trying acquire the same lock
  2.What will be the cause for this since this only happens tens of times in a day
  3. Do I use a poor implementations for this? How can I improve it.

【问题讨论】:

Jaskey - 我建议您将“ilooner”提供的答案标记为正确,并在此处继续讨论 RabbitMQ 邮件列表 - groups.google.com/d/topic/rabbitmq-users/15cv2qroCps/discussion 【参考方案1】:
    看源码每个连接都有一个SocketFrameHandler。套接字帧处理程序在此处https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java 的 writeFrame 方法中同步输出流。这基本上意味着如果您有 200 个通道(每个线程一个)共享一个连接,由于 write 方法中的同步,一次只有一个线程能够发送数据。 我不确定。您通常每秒向rabbitmq 发送多少条消息?一天中是否有一段时间您发送的消息很少,而其他时段您发送的消息很多? 由于 SocketFrameHandler 中的同步,我认为每个连接拥有多个线程和通道没有任何好处。尝试重构您的应用程序,以便将任何需要发送的数据送入内存队列,并且一个线程负责从队列中读取数据并将数据发送到rabbitmq。这样一来,您可以让多个线程工作并生成数据,而一个线程负责发送数据。

【讨论】:

我有超过 4000/s 有时 10000/s 到 mq。对于我的应用程序的一个线程和一个连接的建议,这不是一个令人满意的设计,因为我必须确保我的消息在我响应我的请求者后不会丢失。如果我有一个内存队列并使用它来发布到 mq,当我的应用程序崩溃时消息可能会丢失

以上是关于使用多线程和通道发布时,线程都被阻塞,rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

Python多线程编程

java NIO

NIO 网络通信

HostApduService (NFC) 中的 Android 多线程

在多线程上下文中,在托管对象上设置属性时,CoreData 阻塞

折腾了我一周,原来Netty网络编程就是这么个破玩意儿!!!