关于RocketMQ的基础API操作——这一篇就够了
Posted 默辨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于RocketMQ的基础API操作——这一篇就够了相关的知识,希望对你有一定的参考价值。
关于RocketMQ的基础操作
写在前面:本文中出现的示例及代码,均来源RocketMQ的官网以及B站尚硅谷的RocketMQ视频教程。结合整理出了一个相对简单的笔记。
本文仅为自己学习RocketMQ的一个知识笔记,也希望这个笔记能帮助到你。
一、基础API操作
下面的案例需要引入对应的maven依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>
1、 普通消息
普通消息的消息生产者分别通过三种方式发送消息:同步发送、异步发送以及单向发送。以DefaultMQProducer为起点。
1.1、消息生产端
1)Producer端发送同步消息——send
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer
public static void main(String[] args) throws Exception
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++)
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
2)发送异步消息——send+SendCallback
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。
public class AsyncProducer
public static void main(String[] args) throws Exception
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
// 设置异步重试次数为0次,表示失败后不重试。默认2次
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++)
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback()
@Override
public void onSuccess(SendResult sendResult)
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
@Override
public void onException(Throwable e)
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
);
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
3)单向发送消息的样例——sendOneway
单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返 回ACK。该方式的消息发送效率最高,但消息可靠性较差。
这种方式主要用在不特别关心发送结果的场景,例如日志发送
public class OnewayProducer
public static void main(String[] args) throws Exception
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++)
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
1.2、消息消费端
消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。
通常情况下,用**推模式(DefaultMQPushConsumer)**比较简单,实际上RocketMQ的推模式也是由拉模式(DefaultMQPullConsumerImpl)封装出来的。
4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl
1)push模式——DefaultMQPushConsumer
public class Consumer
public static void main(String[] args) throws InterruptedException, MQClientException
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
2)pull模式——DefaultMQPullConsumer
public class PullConsumer
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs)
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true)
try
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus())
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
catch (Exception e)
e.printStackTrace();
consumer.shutdown();
private static long getMessageQueueOffset(MessageQueue mq)
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
private static void putMessageQueueOffset(MessageQueue mq, long offset)
OFFSE_TABLE.put(mq, offset);
2、顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
2.1、消息生产端
public class Producer
public static void main(String[] args) throws Exception
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]"TagA", "TagC", "TagD";
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++)
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
producer.shutdown();
/**
* 订单的步骤
*/
private static class OrderStep
private long orderId;
private String desc;
public long getOrderId()
return orderId;
public void setOrderId(long orderId)
this.orderId = orderId;
public String getDesc()
return desc;
public void setDesc(String desc)
this.desc = desc;
@Override
public String toString()
return "OrderStep" +
"orderId=" + orderId +
", desc='" + desc + '\\'' +
'';
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders()
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add关于el-upload看这一篇就够了