记一次RocketMQConsumer 服务关闭出现InterruptException异常
Posted 谁动了我的小老弟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了记一次RocketMQConsumer 服务关闭出现InterruptException异常相关的知识,希望对你有一定的参考价值。
记一次RocketMQConsumer 服务关闭出现InterruptException异常
背景提要
出现问题主要还是版本升级
-
老版本核心rocketmq依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> <version>$vesion</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency>
-
新版本核心rocketmq依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
java.lang.InterruptedException
简单列举一个InterruptedException
java.sql.SQLException: interrupt
at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1430) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.pool.DruidDataSource.getConnectionDirect(DruidDataSource.java:1272) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5007) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.filter.FilterAdapter.dataSource_getConnection(FilterAdapter.java:2745) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5003) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.filter.stat.StatFilter.dataSource_getConnection(StatFilter.java:680) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5003) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1250) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1242) ~[druid-1.1.12.jar!/:1.1.12]
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:89) ~[druid-1.1.12.jar!/:1.1.12]
// 省略部分堆栈信息
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399) [rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71) [rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359) [rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
at cn.techwolf.trace.rocketmq.spring.TracingMessageListenerConcurrently.consumeMessage(TracingMessageListenerConcurrently.java:37) [instrument-rocketmq-spring-1.101.jar!/:1.101]
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392) [rocketmq-client-4.9.2.jar!/:4.9.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: java.lang.InterruptedException
注意
以下分析仅为个人观点,并不一定正确,如有杠精,请勿继续观看,欢迎留言讨论哈
提示:本文涉及的一些类,在 记一次RocketMQ服务启动时 NullPointerException问题 本文不做一些详细解释
spring关闭&rocketmq关闭时机(穿插 并不重要)
-
spring
-
rocketmq 继承了SmartCycle 会在容器关闭的时候 回调其stop方法
rocketmq shutdown 分析
首先确认我们的RocketMQConsumer 的实现 DefaultRocketMQListenerContainer
:
// 类全路径org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware
private DefaultMQPushConsumer consumer;
@Override
public void destroy() // DisposableBean 回调 销毁bean的时候 调用
this.setRunning(false);
if (Objects.nonNull(consumer))
consumer.shutdown();
log.info("container destroyed, ", this.toString());
@Override
public void stop() // SmartLifecycle 回调 关闭容器前回调
if (this.isRunning())
if (Objects.nonNull(consumer))
consumer.shutdown();
setRunning(false);
可以看到关闭的话 是先调用到 DefaultRocketMQListenerContainer.stop
方法, 接下来就是看看consumer.shutdown()
方法了:
// 类全路径 org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
/**
* Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
*/
private long awaitTerminationMillisWhenShutdown = 0;
@Override
public void shutdown()
this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
if (null != traceDispatcher)
traceDispatcher.shutdown();
// 类全路径 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public class DefaultMQPushConsumerImpl implements MQConsumerInner
private ConsumeMessageService consumeMessageService;
public synchronized void shutdown(long awaitTerminateMillis)
switch (this.serviceState)
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
// ConsumeMessageService 有两个实现类 分别是 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService
// 我们使用的 consumeMessageService 的具体实现类是 ConsumeMessageConcurrentlyService 具体问题具体分析哈
// 类全路径 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final ThreadPoolExecutor consumeExecutor;
public void shutdown(long awaitTerminateMillis)
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
this.cleanExpireMsgExecutors.shutdown();
// 类全路径org.apache.rocketmq.common.utils.ThreadUtils
public final class ThreadUtils
public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit)
// Disable new tasks from being submitted.
executor.shutdown();
try
// Wait a while for existing tasks to terminate.
if (!executor.awaitTermination(timeout, timeUnit)) // 注意这里
executor.shutdownNow();
// Wait a while for tasks to respond to being cancelled.
if (!executor.awaitTermination(timeout, timeUnit))
log.warn(String.format("%s didn't terminate!", executor));
catch (InterruptedException ie)
// (Re-)Cancel if current thread also interrupted.
executor.shutdownNow();
// Preserve interrupt status.
Thread.currentThread().interrupt();
从上面我们可以清楚的看到 执行到 ConsumeMessageConcurrentlyService.shutdown
的时候,awaitTerminateMillis
默认值是0, 执行到 ThreadUtils.shutdownGracefully
时,会直接调用shutdonwNow
,并没有等待
shutdonw 和 shutdownNow 的区别 百度搜一搜就行了
- shutdown => 平缓关闭,等待所有已添加到线程池中的任务执行完在关闭
- shutdownNow => 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务
对比以前代码 rocketmq-client 4.3.2
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService private final ThreadPoolExecutor consumeExecutor; public void shutdown(long awaitTerminateMillis) this.scheduledExecutorService.shutdown(); this.consumeExecutor.shutdown(); // 这里 this.cleanExpireMsgExecutors.shutdown();
故 我认为是,是因为直接调用了 shutdonwNow
导致服务关闭的时候出现中断 InterruptException 异常
解决方案
从上述分析可以知道 是因为直接调用shutdownNow 导致的,我们应该可以调整 awaitTerminateMillis
参数,也就是DefaultMQPushConsumer.awaitTerminationMillisWhenShutdown
参数
而目前我似乎没看到有哪种方式支持全局配置该参数的方式(官方方式) 这里提供俩思路
RocketMQPushConsumerLifecycleListener
or RocketMQPushConsumerLifecycleListener
// org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware
@Override
public void afterPropertiesSet() throws Exception
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: ", messageType);
private void initRocketMQPushConsumer() throws MQClientException
// 省略部分代码
if (Objects.nonNull(rpcHook))
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
else
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
// 省略部分consumer 参数配置代码
// 可以看到这 他会回调 prepareStart 方法
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener)
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener)
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
可以看到 在DefaultRocketMQListenerContainer
初始化完后会回调 RocketMQPushConsumerLifecycleListener、RocketMQPushConsumerLifecycleListener
的prepareStart
方法
那就很简单了
@Service
@RocketMQMessageListener(nameServer = "$spring.rocketmq.nameServer",
topic = "$topic",
consumerGroup = "$group")
public class TestConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener
@Override
public void onMessage(String msg)
// do something
@Override
public void prepareStart(DefaultMQPushConsumer consumer)
consumer.setAwaitTerminationMillisWhenShutdown(1000); // 设置
可以抽象成一个通用类 consumer 继承该类就行了
DefaultRocketMQListenerContainer.getConsumer
DefaultRocketMQListenerContainer
会被注册成bean 具体的实现在 org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration
中,这里就不分析了,我们可以尝试获取所有的DefaultRocketMQListenerContainer
然后调用其getConsumer
方法 示例代码 具体怎么触发这个代码 就得自己思考和完善了哈 本文不做解释了
public void set()
List<DefaultRocketMQListenerContainer> containers = getAllBeans();
for (DefaultRocketMQListenerContainer c : containers)
c.getConsumer().setAwaitTerminationMillisWhenShutdown(1000);
最后,以上仅为本人分析,并不一定正确,用第一种方式确实没出现了InterruptException了,不知道是否是偶然;
欢迎留言讨论
以上是关于记一次RocketMQConsumer 服务关闭出现InterruptException异常的主要内容,如果未能解决你的问题,请参考以下文章