使用Spring Boot Starter开发RocketMQ
Posted rhwayfunn
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spring Boot Starter开发RocketMQ相关的知识,希望对你有一定的参考价值。
RocketMQ最早是阿里巴巴开源的MQ中间件,在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。现已捐献给Apache,目前是Apache下的顶级项目。捐献后的最新版本是4.1.0-incubating
。
在实际场景中使用MQ打交道最多的是发送消息和消费消息。RocketMQ支持广播消费和集群消费,广播消费是指每隔consumer实例都会收到消息,一条消息可以被多个消费者实例处理;集群消费是指一条消息只能被一个消费者实例消费。
广播消费的使用场景是每个消费者都关心该消息,比如缓存变更消息,因为涉及失效,所以每个消费者都需要更新本地缓存,清除老数据
集群消费的使用场景是只要一个消费者处理过了就行,比如在淘宝的下单消息,理论上只需要一个实例处理就可以,重复处理有可能有问题
消费模式:
/**
* Message model
*
*/
public enum MessageModel
/**
* broadcast
*/
BROADCASTING("BROADCASTING"),
/**
* clustering
*/
CLUSTERING("CLUSTERING");
private String modeCN;
MessageModel(String modeCN)
this.modeCN = modeCN;
public String getModeCN()
return modeCN;
Spring Boot RocketMQ Starter
spring-boot-rocketmq-starter
的github地址,可以直接从Maven中央仓库搜索到:
https://github.com/rhwayfun/spring-boot-rocketmq-starter
使用Starter开发RocketMQ非常简单,添加依赖如下:
<dependency>
<groupId>io.github.rhwayfun</groupId>
<artifactId>spring-boot-rocketmq-starter</artifactId>
<version>0.0.3.RELEASE</version>
</dependency>
配置namerServer和producerGroup:
# Apache RocketMQ
spring.rocketmq.nameServer=localhost:9876
spring.rocketmq.producer-group-name=spring-boot-test-producer-group
消费消息:
继承AbstractRocketMqConsumer
,有两个泛型参数需要指定,第一个是消费的Topic
,第二个是消息的内容Content
,需要继承RocketMqContent
,这个参数通常是发送端指定的,所以在消费端指定后就可以获取发送端的内容了,RocketMqContent
使用了FastJSON进行序列化,目前暂不支持自定义。
RocketMqContent
定义如下:
public class RocketMqContent implements Serializable
private static final long serialVersionUID = 1L;
/**
* FastJSON serialize
*
* @return content json string
*/
@Override
public String toString()
return JSON.toJSONString(this, SerializerFeature.NotWriteDefaultValue);
具体的消费者:
@Component
public class DemoMqConsumer
extends AbstractRocketMqConsumer<DemoMqTopic, DemoMqContent>
@Override
public boolean consumeMsg(DemoMqContent content, MessageExt msg)
System.out.println(new Date() + ", " + content);
return true;
@Override
public Map<String, Set<String>> subscribeTopicTags()
Map<String, Set<String>> map = new HashMap<>();
Set<String> tags = new HashSet<>();
tags.add("test-tag");
map.put("test-topic", tags);
return map;
@Override
public String getConsumerGroup()
return "test-consumer-group";
public class DemoMqTopic implements RocketMqTopic
@Override
public String getTopic()
return "test-topic";
public class DemoMqContent extends RocketMqContent
private int id;
private String desc;
public int getId()
return id;
public void setId(int id)
this.id = id;
public String getDesc()
return desc;
public void setDesc(String desc)
this.desc = desc;
启动应用后就可以说到消息了,日志如下:
发送消息只需要注入DefaultRocketMqProducer
这个bean就可以直接发送消息了,这个类没什么特殊的,就是封装了一层DefaultMQProducer
,加了一些发送消息的方法,具体使用如下:
@Component
public class DemoRocketMqProducerExample
@Resource
private DefaultRocketMqProducer producer;
@PostConstruct
public void execute()
new Timer().schedule(new TimerTask()
@Override
public void run()
DemoRocketMqContent content = new DemoRocketMqContent();
content.setCityId(1);
content.setDesc("城市");
Message msg = new Message("TopicA", "TagA", content.toString().getBytes());
boolean sendResult = producer.sendMsg(msg);
System.out.println("发送结果:" + sendResult);
, 0, 1000);
结果如下:
如果想扩展发送消息的方法,直接继承这个类即可,目前支持发送三种消息:有返回值、one way、延迟消息。
更详细的使用案例在
spring-boot-rocketmq-starter-example
spring-boot-rocketmq-starter
可以直接从Maven中央仓库下载,地址如下:
spring-boot-rocketmq-starter
的github地址:https://github.com/rhwayfun/spring-boot-rocketmq-starter
以上是关于使用Spring Boot Starter开发RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章
java Spring Cloud+Spring boot+mybatis企业快速开发架构之SpringCloud-Spring Boot Starter的介绍及使用