万字精华总结RocketMQ的常见用法(案例+图)
Posted 程序猿小亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了万字精华总结RocketMQ的常见用法(案例+图)相关的知识,希望对你有一定的参考价值。
概述
上篇博文,我们介绍了什么是RocketMQ,以及如何安装单机版的RocketMQ。在安装的过程了,我们主要安装了两个服务,NameServer和Broker。在发送和接收消息时,又接触了两个概念,生产者和消费者。
那这些又代表什么含义呢?
对于单机版本的RocketMQ架构,如下图所示:
主要分为四部分:
- 名字服务(Name Server)
Name Server充当路由消息的提供者。生产者或消费者能够通过Name Server查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
- 代理服务器(Broker Server)
Broker Server负责存储消息、转发消息。Broker在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
- 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
- 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。
对于上面的学习,我们知道了RocketMQ的核心模块以及相应的概念。那么,RocketMQ都有哪些发送消息的方式呢,又如何使用,使用的场景是什么,又是如何消费的?
常见用法
在项目中添加MQ客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>x.x.x</version>
</dependency>
1、基本消息
1.1消息发送
-
在基本消息发送中,我们使用RocketMQ发送三种类型的消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
-
使用RocketMQ两个不同模式,来消费接收到的消息。
1、同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer
public static void main(String[] args) throws Exception
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
//2.指定Nameserver地址
producer.setNamesrvAddr("localhost:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++)
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//5.发送同步消息,将消息发送给其中一个broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
//6.关闭生产者producer
producer.shutdown();
上面的案例中设计到两个陌生的概念,含义如下所示:
生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
2、异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducer
public static void main(String[] args) throws Exception
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//2.指定Nameserver地址
producer.setNamesrvAddr("localhost:9876");
//3.启动producer
producer.start();
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++)
try
final int index = i;
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送异步消息,SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback()
@Override
public void onSuccess(SendResult sendResult)
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
@Override
public void onException(Throwable e)
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
);
catch (Exception e)
e.printStackTrace();
countDownLatch.await(5, TimeUnit.SECONDS);
//6.关闭生产者producer
producer.shutdown();
keys:Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息对消息关键字的提取方便查询。
3、单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
public class OneWayProducer
public static void main(String[] args) throws Exception, MQBrokerException
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//2.指定Nameserver地址
producer.setNamesrvAddr("localhost:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++)
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TopicTest", "TagA", ("Hello World,单向消息" + i).getBytes());
//5.发送单向消息
producer.sendOneway(msg);
//线程睡1秒
TimeUnit.SECONDS.sleep(5);
//6.关闭生产者producer
producer.shutdown();
1.2消息消费
此时,RocketMQ中已经有我们需要发送的消息了,我们使用RocketMQ来消费队列中的消息。接收消息有两种模式:
- 负载均衡模式(Clustering)
- 广播模式(Broadcasting)
启动多个消费者,最直接的区别:模式不同,消费的消息不同。
1、负载均衡模式
默认模式,消费者采用负载均衡方式消费消息,相同消费者组的每个消费者共同消费队列中的消息即每个Consumer实例平均分摊消息,每个消费者处理的消息不同。消费进度存储在服务端。
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-14 15:22
*
* 异步消息,同步消息,单向消息 - 消费者 - 负载均衡模式
*/
public class ClusteringConsumer
public static void main(String[] args) throws InterruptedException, MQClientException
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
//订阅指定 Topic 下的所有消息
consumer.subscribe("TopicTest", "*");
//负载均衡模式,默认
consumer.setMessageModel(MessageModel.CLUSTERING);
// Register callback to execute on arrival of messages fetched from brokers.
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 启动消费者
consumer.start();
System.out.println("消息消费者已启动");
2、广播模式
消费者采用广播的方式消费消息,相同Consumer Group的每个消费者消费的消息都是相同的。消费进度存储在消费者本地。
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 19:02
*
*
* 异步消息,同步消息,单向消息 - 消费者 - 广播模式
*/
public class BroadcastConsumer
public static void main(String[] args) throws Exception
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅指定 Topic 下的所有消息
consumer.subscribe("TopicTest", "*");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)
// System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (msgs != null)
for (MessageExt ext : msgs)
try
System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
catch (UnsupportedEncodingException e)
e.printStackTrace();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
consumer.start();
System.out.println("消息消费者已启动");
消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
2、顺序消息
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
-
全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
实现方式:
当发送和消费参与的queue只有一个
适用场景:
性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
-
分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
实现方式:
如果多个queue参与,按照Sharding key选择队列,则为分区有序,即相对每个queue,消息都是有序的。
适用场景:
性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
2.1顺序消息生产
/**
* @PROJECT_NAME: SpringCloud-Learning
* @USER: yuliang
* @DESCRIPTION:
* @DATE: 2021-04-19 11:17
*
* 顺序消息-生产者
*/
public class ProducerInOrder
public static void main(String[] args) throws Exception
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]"TagA", "TagC", "TagD";
// 订单列表
List<OrderStep> orderList = new ProducerInOrder().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < orderList.size(); i++)
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("OrderTopic", tags[i % tags.length], "KEY" + i, body.getBytes());
//自定义消息队列选取规则
// SendResult sendResult = producer.send(msg, new MessageQueueSelector()
// @Override
// public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
// Long id = (Long) arg; //根据订单id选择发送queue
// long index = id % mqs.size();
// return mqs.get((int) index);
//
// , orderList.get(i).getOrderId());//订单id
//SelectMessageQueueByHash,官方提供的选取规则,还有其他实现,大家自行发现
SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderList.get(i).getOrderId());
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
producer.shutdown();
/**
* 订单的步骤
*/
private static class OrderStep
private long orderId;
private String desc;
public long getOrderId()
return orderId;
public void setOrderId(long orderId)
this.orderId = orderId;
public String getDesc()
return desc;
public void setDesc(String desc)
this.desc = desc;
@Override
public String toString()
return "OrderStep" +
"orderId=" + orderId +
", desc='" + desc + '\\'' +
'';
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders()
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo)万字总结webpack实战案例配置