springboot整合RocketMQ的各种消息类型,生产者,消费者
Posted 宇宙磅礴而冷漠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合RocketMQ的各种消息类型,生产者,消费者相关的知识,希望对你有一定的参考价值。
文章目录
Springboot整合使用
pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
yml
group: producer-demo1指定生产者组名
rocketmq:
name-server: 192.168.126.100:9876;192.168.64.152:9876
producer:
#生产者组名,规定在一个应用里面必须唯一
group: group-1
#消息发送的超时时间,默认为3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认4096
compress-message-body-threshold: 4096
#最大的消息限制 默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 3
#在内部发送失败时是否重试其他代理,这个参数在有多个broker才生效。
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 3
整合使用
消息类型不同比如string,pojo实体类型需要分不同的topic,同consumer-group需要对应一种topic,consumer不管配置什么topic,tag,最后都是以group为准
(如果某个consumer的group名字与group-name1相同,但这个consumer的topic信息与group-name1这个组的topic不同,这会导致group-name1这个组的consumer不能接收全部消息,也就是会导致部分消息无法被消费,所以不要乱操作)
(如果group不同但是topic和tag都相同,那么所有group都会消费同样的消息,有多少个group就消费多少次)
(一台机器可部署多个consumer,需要保证他们的group-name不同,通常topic与group一一对应,所以topic也不同)
为方便查看,示例关键代码
某个service层的实现类MQServiceImpl
package com.ro.service;
import com.ro.pojo.User;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
public class MQServiceImpl implements MQService{
Logger log= LoggerFactory.getLogger(MQServiceImpl.class);
@Autowired
private RocketMQTemplate mqTemplate;
/**
* 同步消息
* 消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式
*/
@Override
public void syncMQMessageSend() {
mqTemplate.syncSend("topic1:tag1", "hello1");
mqTemplate.syncSend("topic1:tag1", "hello2");
SendResult s2=mqTemplate.syncSend("topic1:tag1", "hello3");
log.info("3条同步消息String类型已发送:topic:topic1,tag:tag1:{}",s2);
User user=new User("tom",100);
SendResult result=mqTemplate.syncSend("topic2:tag1", MessageBuilder.withPayload(user).build());
//可以简写成以下,直接传入pojo对象
SendResult result2=mqTemplate.syncSend("topic2:tag1", user.setName("lily").setAge(200));
log.info("object类型同步消息发送结果:{},{}",result,result2);
}
/**
* 异步消息
* 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
* 关键实现异步发送回调接口(SendCallback)
* 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
* 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
*/
@Override
public void asyncMQMessageSend() {
User user=new User("tom",100);
mqTemplate.asyncSend("topic3:tag1", user, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功:{}",sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("异步消息发送失败:{}",throwable.getMessage());
}
});
}
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
*/
@Override
public void oneWaySendMQMessageSend() {
User user=new User("tom",100);
mqTemplate.sendOneWay("topic4:tag1", user);
log.info("单向消息已发送");
}
/**
* 延迟消息
* rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
* RocketMQ 目前只支持固定精度的定时消息。
* 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延迟的底层方法是用定时任务实现的。
*/
@Override
public void delayedSendMQMessageSend() {
User user=new User("tom",100);
SendCallback sc=new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送异步延时消息成功");
}
@Override
public void onException(Throwable throwable) {
log.info("发送异步延时消息失败:{}",throwable.getMessage());
}
};
Message<User> message=MessageBuilder.withPayload(user).build();
//异步延时
mqTemplate.asyncSend("topic5:tag1", message, sc, 3000, 3);
//同步延时(少一个sendCallback)
SendResult result=mqTemplate.syncSend("topic5:tag1", message, 3000, 3);
log.info("发送同步延时消息成功:{}",result);
}
/**
* 顺序消息
*使用hashcode对topic队列数量取模得到对应队列
* 使消息按照顺序被消费,顺序与生产出来的顺序一致
* 比如同一个订单生成,付费顺序需要一致,可以按照订单id来当作hashkey
*/
@Override
public void orderlyMQMessageSend() {
String s1[]={"tom","1"};
String s2[]={"klee我和tom在同一个消费者被消费,而且在tom之后","1"};
String s3[]={"lily我可能不会和tom在同一个消费者被消费","2"};
//同步顺序,也可以是其他类型比如异步顺序,单向顺序
mqTemplate.syncSendOrderly("topic6:tag1", s1[0],s1[1]);
mqTemplate.syncSendOrderly("topic6:tag1", s2[0],s2[1]);
mqTemplate.syncSendOrderly("topic6:tag1", s3[0],s3[1]);
log.info("单向消息已发送");
}
/**
* 过滤消息
* Tag 过滤
* Sql 过滤
* Sql类型语法:
* 数值比较,比如:>,>=,<,<=,BETWEEN,=;
* 字符比较,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 逻辑符号 AND,OR,NOT;
*/
@Override
public void selectorMQSend() {
//Tag过滤就是在发送参数上指定,比如topic1:tag1就指定了tag1,这种光topic1不指定就是所有tag
//这里使用sql92
User user=new User("tom",16);
User user2=new User("klee",9);
Message<User> message=MessageBuilder
.withPayload(user)
.setHeader("age", user.getAge())
.build();
Message<User> message2=MessageBuilder
.withPayload(user)
.setHeader("age", user2.getAge())
.build();
mqTemplate.syncSend("topic10", message);//age=16,消费者设置sql92过滤(header)头数据age=9
mqTemplate.syncSend("topic10", message2);//age=9
log.info("添加age头信息的过滤消息发送完毕");
}
/**
* 分布式事物消息
*生产者需要一个监听自己的类
*/
@Override
public void transactionMQSend() {
User user=new User("klee",9);
Message<User> message=MessageBuilder
.withPayload(user)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
.build();
TransactionSendResult result=mqTemplate.sendMessageInTransaction("topic15:tag1", message, null);
if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
&&
result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("事物消息发送成功");
}
log.info("事物消息发送结果:{}",result);
}
}
同步消息消费者
package com.ro.configure;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-1",topic = "topic1",selectorExpression = "tag1")
public class MQConsumer implements RocketMQListener<String> {
Logger log= LoggerFactory.getLogger(MQConsumer.class);
@Override
public void onMessage(String s) {
log.info("consumer-1 收到string类型消息:{}",s);
}
}
package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-2",topic = "topic2",selectorExpression = "tag1")
public class MQConsumer implements RocketMQListener<User> {
Logger log= LoggerFactory.getLogger(MQConsumer.class);
@Override
public void onMessage(User u) {
log.info("consumer-2 收到user类型消息:{}",u);
}
}
异步消息消费者
代码与同步消费者相同
单向消息消费者
代码与同步消费者相同
延时消息消费者
代码与同步消费者相同
顺序消息消费者
只需要在之前的基础上指定consumeMode = ConsumeMode.ORDERLY
package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY,consumerGroup = "consumer-orderly",topic = "topic6",selectorExpression = "tag1")
public class MQConsumerOrderly implements RocketMQListener<String> {
Logger log= LoggerFactory.getLogger(MQConsumerOrderly.class);
@Override
public void onMessage(String s) {
log.info("接收到顺序消息了:{}",s);
}
}
sql92过滤消息消费者
更多rocketmq集群方案安装配置请查看我的其他文章
https://blog.csdn.net/UnicornRe/article/details/117745226
需要在安装的配置文件xxx.conf加入
#支持sql92
enablePropertyFilter=true
sql语法
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
//selectorExpression指明了只能接收消息属性(header)中age的消息。
@RocketMQMessageListener(
selectorType = SelectorType.SQL92,
consumerGroup = "consumer-selector",
topic = "topic10",
selectorExpression = "age = 9")
public class MQConsumer implements RocketMQListener<User> {
Logger log= LoggerFactory.getLogger(MQConsumer.class);
@Override
public void onMessage(User u) {
log.info("接收到sql过滤消息了:{}",u);
}
}
事物消息消费者
发送方向 MQ 服务端发送消息。
MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半 消息,订阅方将不会接受该消息。
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
producer需要一个监听类
package com.ro.configure;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@RocketMQTransactionListener
public class TransactionListner implements RocketMQLocalTransactionListener {
Logger log= LoggerFactory.getLogger(TransactionListner.class);
//保障线程安全且保证高性能的hashmap,用来记录执行结果
private static Map<String,RocketMQLocalTransactionState> transStateMap=new ConcurrentHashMap<>();
//执行本地事物
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//方法里的object o这个对象是生产者发送消息方法最后一个参数的值
String transId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
log.info("事物id:{}",transId);
try RocketMQ 整合SpringBoot发送事务消息