RocketMQ消息
Posted guxiaohai_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ消息相关的知识,希望对你有一定的参考价值。
一、 MQ背景&选型
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)
目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:
- 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
- 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持18个级别的延迟消息(rabbitmq和kafka不支持)
- 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
- 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
- 支持重复消费(rabbitmq不支持,kafka支持)
二:RocketMQ基本概念
Client端
- Producer Group 一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致
- Consumer Group 一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致
Server端
- Broker 消息中转角色,负责存储消息,转发消息,这里就是RocketMQ Server
- Topic 消息的主题,用于定义并在服务端配置,消费者可以按照主题进行订阅,也就是消息分类,通常一个系统一个Topic
- Message 在生产者、消费者、服务器之间传递的消息,一个message必须属于一个Topic 消息是要传递的信息。邮件中必须包含一个主题,该主题可以解释为要发送给您的信的地址。消息还可能具有可选标签和额外的键值对。例如,您可以为消息设置业务密钥,然后在代理服务器上查找消息以在开发过程中诊断问题。
- Namesrver 一个无状态的名称服务,可以集群部署,每一个broker启动的时候都会向名称服务器注册,主要是接收broker的注册,接收客户端的路由请求并返回路由信息
- Offset 偏移量,消费者拉取消息时需要知道上一次消费到了什么位置, 这一次从哪里开始
- Partition 分区,Topic物理上的分组,一个Topic可以分为多个分区,每个分区是一一个有序的队列。 分区中的每条消息都会给分配一个有序的ID,也就是偏移量,保证了顺序,消费的正确性
- Tag 用于对消息进行过滤,理解为message的标记,同一业务不同目的的message可以用相同的topic但是 可以用不同的tag来区分
- key 消息的KEY字段是为了唯- -表示消息的,方便查问题,不是说必须设置,只是说设置为了方便开发和运维定位问题。 比如:这个KEY可以是订单ID等
三:实战工具类
- 生产者:
public class Producer {
//实例化对象
private static Producer producer = null;
//对象构造方法
private Producer(){};
/**
* @Author: gwh
* @Description: 单例构造方法
* @param topic 主题
* @param tags 标签
* @param message 内容
* @param groupName 组名
* @param address 地址
* @param key key值
* @Date: 2021-07-06 16:53
*/
public static Producer getInstance(String topic, String tags, String message, String groupName, String address,String key) throws Exception{
//初始化实例化对象
if(producer == null){
synchronized (Producer.class){
if(producer == null){
producer = new Producer();
}
}
}
// TODO 1 创建消息生产者,指定生成组名
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(groupName);
// TODO 2 指定NameServer的地址
defaultMQProducer.setNamesrvAddr(address);
// TODO 3 启动生产者
defaultMQProducer.start();
// TODO 4 构建消息对象,主要是设置消息的主题、标签、内容
Message resultMessage = new Message(topic, tags, message.getBytes());
//key值
if(null != key && !key.equals("")){
resultMessage.setKeys(key);
}
// TODO 5 发送消息
defaultMQProducer.send(resultMessage);
// TODO 6 关闭生产者
defaultMQProducer.shutdown();
return producer;
}
}
- 消费者:
public class Consumer {
//实例化对象
private static Consumer consumer = null;
//对象构造方法
private Consumer(){};
/**
* @Author: gwh
* @Description: 单例构造方法
* @param topic 订阅主题
* @param groupName 组名
* @param address 地址
* @Date: 2021-07-06 16:53
*/
public static Consumer getInstance(String topic, String groupName, String address) throws Exception{
//初始化实例化对象
if(consumer == null){
synchronized (Consumer.class){
if(consumer == null){
consumer = new Consumer();
}
}
}
// TODO 1 创建消费者,指定所属的消费者组名
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(groupName);
// TODO 2 指定NameServer的地址
defaultMQPushConsumer.setNamesrvAddr(address);
// TODO 3 指定消费者订阅的主题和标签
defaultMQPushConsumer.subscribe(topic, "*");
// TODO 4 进行订阅:注册回调函数,编写处理消息的逻辑
defaultMQPushConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
try {
for (MessageExt msg : list) {
//....处理回调消息
}
} catch (Throwable throwable) {
throwable.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// TODO 5 启动消费者
defaultMQPushConsumer.start();
return consumer;
}
}
以上是关于RocketMQ消息的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息