记一次RocketMQConsumer 服务关闭出现InterruptException异常

Posted 谁动了我的小老弟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了记一次RocketMQConsumer 服务关闭出现InterruptException异常相关的知识,希望对你有一定的参考价值。

记一次RocketMQConsumer 服务关闭出现InterruptException异常

背景提要

出现问题主要还是版本升级

  1. 老版本核心rocketmq依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>spring-boot-starter-rocketmq</artifactId>
        <version>$vesion</version>
    </dependency>
     <dependency>
    	<groupId>org.apache.rocketmq</groupId>
    	<artifactId>rocketmq-client</artifactId>
    	<version>4.3.2</version>
    </dependency>
    
  2. 新版本核心rocketmq依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.2</version>
    </dependency>
    <dependency>
    	<groupId>org.apache.rocketmq</groupId>
    	<artifactId>rocketmq-spring-boot-starter</artifactId>
    	<version>2.2.1</version>
    </dependency>
    

java.lang.InterruptedException

简单列举一个InterruptedException

java.sql.SQLException: interrupt
	at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1430) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.pool.DruidDataSource.getConnectionDirect(DruidDataSource.java:1272) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5007) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.filter.FilterAdapter.dataSource_getConnection(FilterAdapter.java:2745) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5003) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.filter.stat.StatFilter.dataSource_getConnection(StatFilter.java:680) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5003) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1250) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1242) ~[druid-1.1.12.jar!/:1.1.12]
	at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:89) ~[druid-1.1.12.jar!/:1.1.12]
	// 省略部分堆栈信息
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399) [rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71) [rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359) [rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
	at cn.techwolf.trace.rocketmq.spring.TracingMessageListenerConcurrently.consumeMessage(TracingMessageListenerConcurrently.java:37) [instrument-rocketmq-spring-1.101.jar!/:1.101]
	at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392) [rocketmq-client-4.9.2.jar!/:4.9.2]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: java.lang.InterruptedException

注意

以下分析仅为个人观点,并不一定正确,如有杠精,请勿继续观看,欢迎留言讨论哈

提示:本文涉及的一些类,在 记一次RocketMQ服务启动时 NullPointerException问题 本文不做一些详细解释

spring关闭&rocketmq关闭时机(穿插 并不重要)

  1. spring

  2. rocketmq 继承了SmartCycle 会在容器关闭的时候 回调其stop方法

rocketmq shutdown 分析

首先确认我们的RocketMQConsumer 的实现 DefaultRocketMQListenerContainer:

// 类全路径org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware 
    private DefaultMQPushConsumer consumer;
    @Override
    public void destroy() // DisposableBean 回调 销毁bean的时候 调用
        this.setRunning(false);
        if (Objects.nonNull(consumer)) 
            consumer.shutdown();
        
        log.info("container destroyed, ", this.toString());
    
    @Override
    public void stop()  // SmartLifecycle 回调 关闭容器前回调
        if (this.isRunning()) 
            if (Objects.nonNull(consumer)) 
                consumer.shutdown();
            
            setRunning(false);
        
    

可以看到关闭的话 是先调用到 DefaultRocketMQListenerContainer.stop方法, 接下来就是看看consumer.shutdown() 方法了:

// 类全路径 org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer 
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    /**
     * Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
     */
    private long awaitTerminationMillisWhenShutdown = 0;
     @Override
    public void shutdown() 
        this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
        if (null != traceDispatcher) 
            traceDispatcher.shutdown();
        
    

// 类全路径 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public class DefaultMQPushConsumerImpl implements MQConsumerInner 
    private ConsumeMessageService consumeMessageService;
	public synchronized void shutdown(long awaitTerminateMillis) 
        switch (this.serviceState) 
            case CREATE_JUST:
                break;
            case RUNNING:
                this.consumeMessageService.shutdown(awaitTerminateMillis);
                this.persistConsumerOffset();
                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                log.info("the consumer [] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.destroy();
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                break;
            case SHUTDOWN_ALREADY:
                break;
            default:
                break;
        
    

// ConsumeMessageService 有两个实现类 分别是 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService
// 我们使用的 consumeMessageService 的具体实现类是 ConsumeMessageConcurrentlyService 具体问题具体分析哈 
// 类全路径 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService 
	private final ThreadPoolExecutor consumeExecutor;
    public void shutdown(long awaitTerminateMillis) 
        this.scheduledExecutorService.shutdown();
        ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
        this.cleanExpireMsgExecutors.shutdown();
    

// 类全路径org.apache.rocketmq.common.utils.ThreadUtils
public final class ThreadUtils 
    public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) 
        // Disable new tasks from being submitted.
        executor.shutdown();
        try 
            // Wait a while for existing tasks to terminate.
            if (!executor.awaitTermination(timeout, timeUnit))  // 注意这里
                executor.shutdownNow();
                // Wait a while for tasks to respond to being cancelled.
                if (!executor.awaitTermination(timeout, timeUnit)) 
                    log.warn(String.format("%s didn't terminate!", executor));
                
            
         catch (InterruptedException ie) 
            // (Re-)Cancel if current thread also interrupted.
            executor.shutdownNow();
            // Preserve interrupt status.
            Thread.currentThread().interrupt();
        
    

从上面我们可以清楚的看到 执行到 ConsumeMessageConcurrentlyService.shutdown的时候,awaitTerminateMillis默认值是0, 执行到 ThreadUtils.shutdownGracefully时,会直接调用shutdonwNow,并没有等待

shutdonw 和 shutdownNow 的区别 百度搜一搜就行了

  • shutdown => 平缓关闭,等待所有已添加到线程池中的任务执行完在关闭
  • shutdownNow => 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务

对比以前代码 rocketmq-client 4.3.2

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService 
	private final ThreadPoolExecutor consumeExecutor;
 public void shutdown(long awaitTerminateMillis) 
     this.scheduledExecutorService.shutdown();
     this.consumeExecutor.shutdown(); // 这里
     this.cleanExpireMsgExecutors.shutdown();
 

故 我认为是,是因为直接调用了 shutdonwNow 导致服务关闭的时候出现中断 InterruptException 异常

解决方案

从上述分析可以知道 是因为直接调用shutdownNow 导致的,我们应该可以调整 awaitTerminateMillis参数,也就是DefaultMQPushConsumer.awaitTerminationMillisWhenShutdown参数

而目前我似乎没看到有哪种方式支持全局配置该参数的方式(官方方式) 这里提供俩思路

RocketMQPushConsumerLifecycleListener or RocketMQPushConsumerLifecycleListener

// org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware 
	@Override
    public void afterPropertiesSet() throws Exception 
        initRocketMQPushConsumer();
        this.messageType = getMessageType();
        this.methodParameter = getMethodParameter();
        log.debug("RocketMQ messageType: ", messageType);
    
    private void initRocketMQPushConsumer() throws MQClientException 
        // 省略部分代码
        if (Objects.nonNull(rpcHook)) 
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                enableMsgTrace, this.applicationContext.getEnvironment().
                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
         else 
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        
        // 省略部分consumer 参数配置代码
      	// 可以看到这 他会回调 prepareStart 方法
        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) 
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
         else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) 
            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
        
    

可以看到 在DefaultRocketMQListenerContainer初始化完后会回调 RocketMQPushConsumerLifecycleListener、RocketMQPushConsumerLifecycleListenerprepareStart方法

那就很简单了

@Service
@RocketMQMessageListener(nameServer = "$spring.rocketmq.nameServer",
        topic = "$topic",
        consumerGroup = "$group")
public class TestConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener 

    @Override
    public void onMessage(String msg) 
        // do something
    

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) 
        consumer.setAwaitTerminationMillisWhenShutdown(1000); // 设置
    

可以抽象成一个通用类 consumer 继承该类就行了

DefaultRocketMQListenerContainer.getConsumer

DefaultRocketMQListenerContainer会被注册成bean 具体的实现在 org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration中,这里就不分析了,我们可以尝试获取所有的DefaultRocketMQListenerContainer然后调用其getConsumer方法 示例代码 具体怎么触发这个代码 就得自己思考和完善了哈 本文不做解释了

public void set() 
    List<DefaultRocketMQListenerContainer> containers = getAllBeans();
    for (DefaultRocketMQListenerContainer c : containers) 
        c.getConsumer().setAwaitTerminationMillisWhenShutdown(1000);
    

最后,以上仅为本人分析,并不一定正确,用第一种方式确实没出现了InterruptException了,不知道是否是偶然;

欢迎留言讨论

以上是关于记一次RocketMQConsumer 服务关闭出现InterruptException异常的主要内容,如果未能解决你的问题,请参考以下文章

记一次故障处理----主机异常关闭后mongodb二进制文件损坏

记一次排查mysql数据库连接未关闭问题的过程

记一次TokuMX数据库集群恢复

记一次内存溢出查找的问题

记一次应急排查"新"路历程

springcloud3.记一次网关优化