rocketmq官方示例超时报错处理

Posted bug本ba

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq官方示例超时报错处理相关的知识,希望对你有一定的参考价值。

1.异常信息

版本说明:
mq服务器版本:4.9.2
java客户端:4.9.2

1.1 RemotingTimeoutException: invokeSync call timeout

Caused by: org.apache.rocketmq.remoting.exception.RemotingTimeoutException: invokeSync call timeout
	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:375)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1367)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1357)
	at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622)
	... 6 more

1.2 RemotingTooMuchRequestException: sendDefaultImpl call timeout

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:683)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
	at hh.simple.SyncSendMsg.main(SyncSendMsg.java:35)

2.处理方法

新建自定义MyProducer 类,继承DefaultMQProducer,重写cloneClientConfig,新增实例化MyProducer对象时用到的构造方法
MyProducer 代码如下:
public class MyProducer extends DefaultMQProducer 

    public MyProducer(String producerGroup) 
        super(producerGroup);
    

    @Override
    public ClientConfig cloneClientConfig() 
        ClientConfig config = super.cloneClientConfig();
        config.setMqClientApiTimeout(super.getMqClientApiTimeout());
        return config;
    

main方法(发送消息的代码如下):

public static void main(String[] args) throws MQClientException 
        DefaultMQProducer producer = new MyProducer("test-group");
        // 设置超时时间
        producer.setMqClientApiTimeout(1000*10);
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        System.out.printf("SyncProducer Started.%n");
        // topic, tags, body
        Message msg = new Message("test-topic","test-tag", "测试消息1234".getBytes(Charset.forName("utf-8")));
        try 
            // 此处的timeout是等待broker响应信息的时长,设置为10s,以防超时
            SendResult result = producer.send(msg, 1000*10);
            System.out.printf("%s%n", result);
         catch (RemotingException e) 
            e.printStackTrace();
         catch (MQBrokerException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
        producer.shutdown();
    

3.排查处理过程

根据报错信息,可以定位到报错源码位置:
MQClientAPIImplgetTopicRouteInfoFromNameServer方法的1367行

容易推断出,是因为此方法执行时间超过了给定的timeoutMillis,于是便返回上层,追究参数是从哪里来的:
查找此方法在哪里调用,经排查发现在
MQClientInstanceupdateTopicRouteInfoFromNameServer方法中622行,
那么问题来了,这个clientConfig是从哪里来的呢?请继续往下看

经排查,这个参数是从MQClientInstance的构造函数传入的

进一步排查,在MQClientManagergetOrCreateMQClientInstance方法的52行调用了上述构造函数并传入由clientConfig.cloneClientConfig()返回的ClientConfig对象,那么这个clientConfig到底是啥,继续往下:

标记一

进一步排查,在DefaultMQProducerImpl中start方法的201行,原来是defaultMQProducer,应该就是我们new出来的DefaultMQProducer对象,为了证实继续往下:

看看是哪个地方调用了此构造函数,继续往下:

果然,就是我们的DefaultMQProducer对象,所以之前的"那么这个clientConfig到底是啥"",答案就是:clientConfigDefaultMQProducer对象。

那么知道clientConfig就是DefaultMQProducer对象后,再回到标记一处:进入cloneClientConfig方法

就是重新new了一个ClientConfig对象,把配置参数进行一一赋值,仔细观察这里面没有mqClientApiTimeout这个参数,意思就是除了这里面赋值外的其他参数全部会恢复为ClientConfig类中的参数默认值。

即:就算我们进行如下操作,在producer中设置了mqClientApiTimeout参数,经过上述操作后,配置也会被清空

DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置超时时间
producer.setMqClientApiTimeout(1000*10);

故我们只有重写cloneClientConfig方法,将mqClientApiTimeout参数设置进新的ClientConfig对象中。到此问题得到解决,这就是为什么要继承DefaultMQProducer并重写cloneClientConfig方法的原因。

对于1.2的RemotingTooMuchRequestException: sendDefaultImpl call timeout报错,调用含超时参数的方法即可,具体排查可进入下述send方法逐步查看源码。

// 此处的timeout是等待broker响应信息的时长,设置为10s,以防超时
            SendResult result = producer.send(msg, 1000*10);
            System.out.printf("%s%n", result);

重新运行main方法,超时异常不再出现,若还报超时,可以尝试将超时时间增加。

DefaultMQPushConsumer也可按照上述方法进行设置即可

以上是关于rocketmq官方示例超时报错处理的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot集成RocketMQ报错:Bad annotation definition in @ExtRocketMQTemplateConfiguration...

五.RocketMQ极简入门-RocketMQ延迟消息

RocketMQ报错

nginx lua socket timeout 问题处理

RocketMQ初入门踩坑记

RocketMq-简单示例