springboot整合RocketMq(非事务)

Posted 一睡万古

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合RocketMq(非事务)相关的知识,希望对你有一定的参考价值。

1、配置文件

1、yml配置文件

rocketmq: #mq配置
  producer:
    iseffect: true
    type: default # (transaction,default) transaction:TransactionMQProducer; default:DefaultMQProducer
    groupName: testzjlGroup
    topicName: test_topic
    namesrvAddr: 192.168.244.128:9876
  consumer:
    iseffect: true
    type: default # (transaction,default) transaction:TransactionMQProducer; default:DefaultMQProducer
    groupName: testzjlGroup
    topicName: test_topic
    namesrvAddr: 192.168.244.128:9876

2、对应的java类

技术图片
package com.gofun.customer.mq;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "rocketmq.producer")
public class RocketMqProducerProperties {

    private Boolean iseffect;
    private String type;
    private String groupName;
    private String topicName;
    private String namesrvAddr;

    public Boolean getIseffect() {
        return iseffect;
    }

    public void setIseffect(Boolean iseffect) {
        this.iseffect = iseffect;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getTopicName() {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }
}
View Code

2、DefaultMQProducer生产者

1、生产者配置

package com.gofun.customer.mq;


import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(RocketMqProducerProperties.class)
@ConditionalOnProperty(prefix = "rocketmq.producer", name = "iseffect", havingValue = "true")
public class RocketMqConfig {
    @Autowired
    private RocketMqProducerProperties rocketMqProperties;

    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", name = "type", havingValue = "default")
    public DefaultMQProducer defaultMQProducer() throws MQClientException {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(rocketMqProperties.getGroupName());
        defaultMQProducer.setNamesrvAddr(rocketMqProperties.getNamesrvAddr());
        defaultMQProducer.setVipChannelEnabled(false);
        defaultMQProducer.setRetryTimesWhenSendAsyncFailed(10);
        defaultMQProducer.start();
        return defaultMQProducer;
    }

}
  1. 启动应用创建生产者
  2. 使用配置文件为RocketMqProducerProperties类
  3. 配置文件iseffect = true时生效该配置
  4. 配置文件 type=default 时生效 DefaultMQProducer 生产者

2、生产者工具类

技术图片
package com.gofun.customer.mq;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RocketMqProducer {
    @Autowired
    private DefaultMQProducer defaultMQProducer;
    @Autowired
    private RocketMqProducerProperties rocketMqProperties;

    public SendResult send(String topic, String tag, Object obj) {
        String body = null;
        if(obj == null){
            return null;
        }
        if(obj instanceof String){
            body = (String) obj;
        }else{
            body = JSON.toJSONString(obj);
        }
        Message msg = new Message(topic, tag, body.getBytes());
        SendResult sendResult = null;
        try {
            sendResult = defaultMQProducer.send(msg);
        } catch (Exception e) {
        }
        return sendResult;
    }

    /**
     * 延时消息
     * @param topic
     * @param tag
     * @param obj
     * @param delayTimeLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m  7m  8m  9m  10m  20m 30m  1h  2h
     *                       1  2  3   4   5  6  7  8  9  10  11  12  13  14   15  16  17  18
     * @return
     */
    public SendResult send(String topic, String tag, Object obj,int delayTimeLevel) {
        String body = null;
        if(obj == null){
            return null;
        }
        if(obj instanceof String){
            body = (String) obj;
        }else{
            body = JSON.toJSONString(obj);
        }
        Message msg = new Message(topic, tag, body.getBytes());
        msg.setDelayTimeLevel(delayTimeLevel);
        SendResult sendResult = null;
        try {
            sendResult = defaultMQProducer.send(msg);
        } catch (Exception e) {
        }
        return sendResult;
    }

    public SendResult send(String tag, Object obj) {
        return send(rocketMqProperties.getTopicName(), tag, obj);
    }

}
View Code

3、生产者发送消息

1、引入工具类调用 发送方法

@Autowired
private RocketMqProducer rocketMqProducer;
rocketMqProducer.send("testTag","测试mq发送消息。。。。");

4、问题

注意:
  1. mq设置autoCreateTopicEnable=false时需要手动创建topic
  2. 启动后后台一直报打印日志如:closeChannel: close the connection to remote address[] result: true  说明namesrvAddr 的ip 或 端口号配置错误
  3. 启动后后台一直报打印日志如:closeChannel: close the connection to remote address[192.168.244.128:9876] result: true  可能是jar包错误
  4. 要引入fastjson的jar包

3、消费者

1、DefaultMQPullConsumer 消费者

private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();

    public void testDefaultMQPullConsumer() throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rocketMqConsumerProperties.getGroupName());
        consumer.setNamesrvAddr("192.168.244.128:9876;192.168.244.130:9876");
        consumer.start();
        // 从指定topic中拉取所有消息队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
        for(MessageQueue mq:mqs){
            try {
                // 获取消息的offset,指定从store中获取
                long offset = consumer.fetchConsumeOffset(mq,true);
                System.out.println("consumer from the queue:"+mq+":"+offset);
                while(true){
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
                    switch(pullResult.getPullStatus()){
                        case FOUND:
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt m : messageExtList) {
                                System.out.println(new String(m.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEGAL:
                            break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        consumer.shutdown();
    }

    // 保存上次消费的消息下标
    private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) {
        OFFSE_TABLE.put(mq, nextBeginOffset);
    }

    // 获取上次消费的消息的下标
    private static Long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if(offset != null){
            return offset;
        }
        return 0l;
    }

2、DefaultMQPushConsumer 消费者模板方法

RocketMqConsumerProperties 配置类 参见 消费者配置类。

package com.gofun.customer.mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

public abstract class RocketMqDefaultConsumer {
    @Autowired
    private RocketMqConsumerProperties rocketMqConsumerProperties;

    public void listener(String topic, String tag) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketMqConsumerProperties.getGroupName());
        consumer.setNamesrvAddr(rocketMqConsumerProperties.getNamesrvAddr());
        consumer.subscribe(topic, tag);
        // 开启内部类实现监听
//        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> RocketMqDefaultConsumer.this.dealBody(list));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                return RocketMqDefaultConsumer.this.dealBody(list);
            }
        });
        consumer.start();
    }

    // 处理body的业务
    public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);
}
  1. 消费者需要继承此抽象类
  2. 传入topic 和 tag,添加监听, 开启消费者
  3. 定义抽象函数处理消息,实现类需要自已实现处理函数内容

3、DefaultMQPushConsumer 消费者实现

package com.gofun.customer.controller.test;

import com.gofun.customer.mq.RocketMqDefaultConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;

@Component
public class TestConsumer extends RocketMqDefaultConsumer implements InitializingBean {
    private final String testtopic = "test_topic";
    private final String testflag = "testTag";

    @Override
    public void afterPropertiesSet() throws Exception {
        super.listener(testtopic, testflag);
    }

    @Override
    public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs) {
        if (!CollectionUtils.isEmpty(msgs)) {
            for (MessageExt msg : msgs) {
                byte[] body = msg.getBody();
                String s = new String(body);
                System.out.println("这是rocketmq消费者消费的消息:"+s);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}
  1. 继承模板类定义自己的topic和tag
  2. 实现处理消息的方法
  3. 实现接口InitializingBean ,加载完成自动调用 afterPropertiesSet方法,父类模板添加监听

参考https://www.jianshu.com/p/5260a2739d80

以上是关于springboot整合RocketMq(非事务)的主要内容,如果未能解决你的问题,请参考以下文章

11 SpringBoot整合RocketMQ实现事务消息

SpringBoot整合RocketMQ

SpringBoot(17)---SpringBoot整合RocketMQ

SpringBoot整合RocketMQ

RocketMQ(二十四)整合SpringBoot

SpringBoot整合RocketMQ