rocketmq 消息队列 初步了解
Posted benjious
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq 消息队列 初步了解相关的知识,希望对你有一定的参考价值。
消息队列概述
队列的本质
-
一次RPC变成两次 RPC
-
内容转储
-
选择合适的时机投递
队列设计重点
-
RPC 通信协议
-
存储选型
-
消费关系处理
-
实现事务
-
防丢/防重
-
批量/异步与性能 强烈推荐这篇文章,从设计的角度来思考消息队列的各种问题,阅读源码只是理解设计的最终实现,只有知道了设计的思路阅读源码才会更加容易理解和更加容易吸收。
使用示例
生产端
public class Product { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
消费端
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("TopicTest", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
RocketMQ名词解释
-
NameServer:在系统中 是做命名服务,更新和发现 broker服务。就像dubbo 中的 zk一样的角色。主要作用为消息生产和消费消费者提供关于主题Topic 的路由消息
-
Broker-Master:broker 消息主机服务器。
-
Broker-Slave: broker 消息从机服务器。
-
Producer: 消息生产者。
-
Consumer: 消息消费者。
角色之间如何工作
Broker 消息服务器在启动时向所有NameServer注册,NameServer启动定时任务,监视broker 是否存活,producer在发送消息前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。
RocketMQ启动
我们先来看一下启动的脚本
start mqnamesrv.cmd start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
可以知道实际就是运行两个cmd 文件,启动的相关类有 : BrokerStartup 和 NamesrvStartup 。 后面的文章我们将会介绍关于这两个类
总结
文章从设计思路入手来了解消息队列,后面的文章我们将通过阅读源码落地设计思路。
参考资料
- http://gd-rus-public.cn-hangzhou.oss-pub.aliyun-inc.com/attachment/201604/08/20160408165024/RocketMQ_design.pdf
- https://erik1288.github.io/
- https://erik1288.github.io/2017/11/03/RocketMQ-Message-send-and-persistence/
- https://zhuanlan.zhihu.com/p/21649950 (美团,设计一个消息队列)
- https://segmentfault.com/a/1190000015301449 (消息队列的面试题)
以上是关于rocketmq 消息队列 初步了解的主要内容,如果未能解决你的问题,请参考以下文章