SpringBoot2.X 整合 RocketMQ4.X

Posted 认真对待世界的小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot2.X 整合 RocketMQ4.X相关的知识,希望对你有一定的参考价值。

开发生产者代码

第一步:创建很普通的 SpringBoot 项目

第二步:加入相关依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

第三步:写代码

PayProducer 类如下所示:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class PayProducer {

    private String producerGroup = "pay_group";

    private String nameServerAddr = "192.168.0.104:9876";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");

        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

PayController 类如下所示:

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;

    private static  final String topic = "pay_test_topic";

    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(topic,"taga", ("hello rocketmq = "+text).getBytes() );
        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);
        return new HashMap<>();
    }
}

第四步:测试

通过可视化管理后台查看消息

 

 

Message对象

  • topic:主题名称
  • tag:标签,用于过滤
  • key:消息唯一标示,可以是业务字段组合
  • body:消息体,字节数组

注意:发送消息到 Broker 前,需要判断是否有此 Topic。启动 Broker 的时候,本地环境建议开启自动创建 Topic,生产环境建议关闭自动化创建 Topic。建议先手工创建 Topic,如果靠程序自动创建,然后再投递消息,会出现延迟情况。自动创建topic: autoCreateTopicEnable=true 无效原因:客户端版本要和服务端版本保持一致。

概念模型: 一个 Topic 下面对应多个 Queue,可以在创建 Topic 时指定,如订单类 Topic。

常见错误一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里云存在多网卡,rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker 
新增配置:conf/broker.conf  (属性名称brokerIP1=broker所在的公网ip地址 )
新增这个配置:brokerIP1=120.76.62.13  

启动命令:nohup sh bin/mqbroker -n localhost:9876  -c ./conf/broker.conf &

常见错误二

MQClientException: No route info of this topic, TopicTest1
原因:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 此Topic, 或者broker和Nameserver网络不通
解决:
通过 sh bin/mqbroker -m  查看配置
autoCreateTopicEnable=true 则自动创建topic

Centos7关闭防火墙  systemctl stop firewalld

常见错误三

控制台查看不了数据,提示连接 10909错误

原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909

解决:阿里云安全组需要增加一个端口 10909

其他错误:

https://blog.csdn.net/qq_14853889/article/details/81053145
https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E
https://www.jianshu.com/p/bfd6d849f156
https://blog.csdn.net/wangmx1993328/article/details/81588217

 

开发消费者代码

接着上面的工程,直接上代码,PayConsumer 类如下所示:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;

    private String CONSUMER_GROUP = "pay_consumer_group";
    private String NAME_SERVER = "192.168.0.104:9876";
    private String TOPIC = "pay_test_topic";

    public PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(this.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe(this.TOPIC, "*");

//        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//            try {
//                Message msg = msgs.get(0);
//                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//                String topic = msg.getTopic();
//                String body = new String(msg.getBody(), "utf-8");
//                String tags = msg.getTags();
//                String keys = msg.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    Message msg = msgs.get(0);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {

                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }
}

注释掉的部分采用 Lambda 表达式写法,效果是一样的。

常见问题

1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 

2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, 	MacBook-Air.local, MacBook-Air.local]
解决:多网卡问题处理
	1、设置producer:  producer.setVipChannelEnabled(false);
	2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
		namesrvAddr = 192.168.0.101:9876
		brokerIP1 = 192.168.0.101

4、DESC: service not available now, maybe disk full, CL:
	解决:修改启动脚本runbroker.sh,在里面增加一句话即可:		
	JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
	(磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)
	
常见问题处理
	https://blog.csdn.net/sqzhao/article/details/54834761
	https://blog.csdn.net/mayifan0/article/details/67633729
	https://blog.csdn.net/a906423355/article/details/78192828

 

以上是关于SpringBoot2.X 整合 RocketMQ4.X的主要内容,如果未能解决你的问题,请参考以下文章

Springboot2.x最全整合系列(持续更新)

springboot2.x整合kafka

SpringBoot2.x整合WebSoket

springboot2.x整合quartz2.x.md

SpringBoot2.x 整合Redis和使用Redis缓存

SpringBoot2.X整合Redis