JBoss 和 ActiveMQ 之间挂起的 Socket.read() 线程
Posted
技术标签:
【中文标题】JBoss 和 ActiveMQ 之间挂起的 Socket.read() 线程【英文标题】:Socket.read() thread hanging between JBoss and ActiveMQ 【发布时间】:2013-02-24 11:19:16 【问题描述】: 给定 我的 Java 应用是部署到 JBoss (4.0.4GA) 的 WAR 发布和订阅 ActiveMQ (5.6.0) 实例 Java 应用程序使用 Apache Camel (2.10.3) 与 ActiveMQ 进行所有集成(生产和消费) JBoss 和 ActiveMQ 在自己的(CentOS 5.6 Final)四核虚拟服务器上,每个虚拟服务器位于不同的物理服务器上我遇到了线程挂起问题,并且在我的线程转储中看到以下内容:
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:129)
java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
java.io.BufferedInputStream.read(BufferedInputStream.java:317)
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687)
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:632)
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1195)
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:379)
org.springframework.remoting.httpinvoker.SimpleHttpInvokerRequestExecutor.validateResponse(SimpleHttpInvokerRequestExecutor.java:146)
org.springframework.remoting.httpinvoker.SimpleHttpInvokerRequestExecutor.doExecuteRequest(SimpleHttpInvokerRequestExecutor.java:66)
org.springframework.remoting.httpinvoker.AbstractHttpInvokerRequestExecutor.executeRequest(AbstractHttpInvokerRequestExecutor.java:136)
org.springframework.remoting.httpinvoker.HttpInvokerClientInterceptor.executeRequest(HttpInvokerClientInterceptor.java:192)
org.springframework.remoting.httpinvoker.HttpInvokerClientInterceptor.executeRequest(HttpInvokerClientInterceptor.java:174)
org.springframework.remoting.httpinvoker.HttpInvokerClientInterceptor.invoke(HttpInvokerClientInterceptor.java:142)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
$Proxy117.SigmaCruxer(Unknown Source)
com.tms.SigmaClient.SigmaClient.processMessage(SigmaClient.java:46)
com.tms.SigmaClient.SigmaServiceEndpoint.doSigma(SigmaServiceEndpoint.java:29)
com.tms.SigmaClient.SigmaServiceEndpoint.mark(SigmaServiceEndpoint.java:43)
sun.reflect.GeneratedMethodAccessor193.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
java.lang.reflect.Method.invoke(Method.java:597)
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:329)
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:231)
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:169)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:104)
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:74)
org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:102)
org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:72)
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:50)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:114)
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:284)
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:109)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:99)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:318)
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:209)
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:305)
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:102)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:104)
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:91)
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
java.lang.Thread.run(Thread.java:662)
根据这两篇文章:(here 和 here),我的 JBoss 应用程序在 Socket.read()
上有一个阻塞 I/O 操作,它正在等待来自下游服务提供商的完整响应(在我的例子中,活动MQ)。同样,根据这些文章,罪魁祸首是以下 3 项之一:
我正在尝试找出这三个中的哪一个。该线程转储中是否有任何内容表明它是哪一个?我的理解(在阅读了那些文章之后)是 真正的挂起是客户端(阻塞)套接字刚刚没有收到它需要考虑的所有字节的事实响应完成;表示它没有收到来自 ActiveMQ 的任何响应,或者它只是没有收到完整的响应。
所以我问:
-
是否有明确的迹象表明这 3 种情况中的哪一种是这种情况?如果是这样,是什么/为什么?如果没有,应该我的下一步是什么(我也是设置 ActiveMQ 的“管理员”,因此我可以完全访问它以及 JBoss 和部署到它的 WAR)。
升级到更新的 JBoss 会解决这个问题吗?也许 4.0.4GA 正在使用“旧”(阻塞)Java I/O,而新版本可能使用 NIO?可能是一个远景,但还不能抹黑它。
两篇文章都强调应该实施正确的套接字超时配置,这可以很好地缓解所有这些问题(尽管它没有解决潜在的 ActiveMQ 无响应和/或网络问题):
-
这是我在 Java 代码中编写的超时吗?如果是这样,如何以及使用什么 API? JMS?一些 ActiveMQ 客户端 jar?
这是我在操作系统级别实现的超时吗?如果是这样,我不确定如何继续...
这是我在服务器端 (ActiveMQ) 实现的超时吗?如果有,怎么做?
我认为我正在接近解决方案,但有点卡住并且很难通过树木看到森林。提前致谢!
【问题讨论】:
从堆栈跟踪中,无论正在运行什么,camel 都已经收到来自 activemq 的消息,并且该消息已被分派给 camel 中的侦听器(onMessage 调用)。 Camel 已经完成了一些逻辑,并且正在进行 SigmaCruxer 调用,它看起来像一个 Web 服务调用。正是该调用在堆栈跟踪中读取的套接字上被阻塞。尝试联系的 Web 服务调用是什么,它是否活跃且响应迅速?这似乎不是一个 activemq 端点 - 你确定你的套接字在 activemq 上被阻塞了吗? 【参考方案1】:我在使用 JBoss(和 Glassfish)和 ActiveMQ 方面有一些经验,但我以前从未使用过 Camel(但对 Mule 很熟悉,我读到过类似的)。
您的堆栈跟踪看起来像是 Camel 试图将 ActiveMQ(跟踪底部的 JMS 内容)链接到 Web 端点(跟踪顶部的 HTTP 内容)。
我对 Camel 的运行位置(CamelContext)有点困惑。您说您有两台虚拟机,一台用于 JBoss,一台用于 ActiveMQ。在我的例子中,我们使用我们的 ActiveMQ 在机器上运行 Mule ESB。你的骆驼在哪里跑?
您的堆栈跟踪看起来最像第一篇文章中的问题 #1。就好像骆驼部分无法“看到”网络端点。检查您的 WAR 是否已正确部署,以及您的 Web 端点 (WSDL) 在两个虚拟机中是否可见。检查您的端点;也许一个被设置为 localhost 或其他东西,这将不允许它到达另一台机器。
很难判断是阅读不完整还是完全无法阅读。有数据通过吗? Web 服务器可能会慢慢过载并且无法跟上请求(并且像您的错误一样饿死一些线程)。当您响应缓慢或请求很多时,套接字超时变得很重要;如果您可以创建一个简单的测试(快速且请求很少),那么您至少可以验证您是否具有基本连接。什么数据输入(测试)导致了这个错误?
如果有更多意见,我很乐意尝试改进此答案。 (很抱歉,我会尝试对您的问题发表评论,但我认为我还没有代表...)
【讨论】:
感谢@Seka (+1) - 为您提供更多信息。 Camel 路由在部署到 JBoss 的 WAR 中“存在”(已启动)。 Camel 路由作为 JMS 客户端,ActiveMQ 是 JMS 服务器。这只是偶尔发生,Camel 和 ActiveMQ 之间 99.99% 的发布/订阅通过 OK。问题是,每隔一段时间(几百万条消息),我就会看到socketRead0
(阻塞读取)并且骆驼线程开始挂起。它挂起是因为它正在等待来自 ActiveMQ 的完整响应而没有得到它。
消息是可变大小的吗?值得知道是大单消息还是大量并发消息导致挂起。对于前者,增加套接字超时会有所帮助;对于后者,增加工作线程的数量可能会有所帮助。从堆栈跟踪来看,Camel 线程似乎挂在套接字(Web)端而不是 JMS 端,就像它试图从套接字拉下数据并将其发送到 JMS,但没有获取数据(或至少没有及时)。
再次感谢@Seka (+1) - 这是一个应用程序,它有 64 个使用者线程在 ActiveMQ 实例上的队列上侦听。消息一入队,64 个消费者中的一个就会拿起它并开始处理它。这些是大小略有不同的消息(它们的大小都有点不同,但通常是相同的),每天有数十万条消息;有时甚至更多。 好奇:是什么让你认为这是挂在 Web 端的?这是否意味着 ActiveMQ(服务器)?你会建议我如何开始诊断?再次感谢!
让我建议 Web 端的是线程挂在套接字读取中,好像 Camel 正在某个套接字端点和 JMS 队列之间创建连接,而套接字端没有提供数据.但是,我对您的系统了解得越多,可能是套接字是内部 Camel 的东西,试图从您的 WAR 中获取数据并返回到 ActiveMQ(反之亦然)。
您的数据是否源自网络?就像 Web 请求触发消息和消费者线程处理的东西一样吗?我在想可能正在创建第 65 个并发请求,并且由于您的工作人员都会很忙,因此该线程可能会挂起(因为没有消费者线程正在处理其请求而被饿死)。如果消息是由 Web 端触发的,则可以限制 JBoss 中的并发 Web 连接数(如果请求多于工作线程没有意义)。当然,这取决于您的系统和要求。以上是关于JBoss 和 ActiveMQ 之间挂起的 Socket.read() 线程的主要内容,如果未能解决你的问题,请参考以下文章
带有 QSharedMemory 的 IPC 和如果进程之一挂起的风险
vs2013-执行签入挂起的更改,提示没有挂起的更改(实际代码已修改)