1RocketMQ 源码解析之 Hello World
Posted carl-zhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1RocketMQ 源码解析之 Hello World相关的知识,希望对你有一定的参考价值。
Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可伸缩性。
它提供了多种功能:
- 消息传递模式,包括发布/订阅、请求/应答和流
- 金融级交易信息
- 内置容错和高可用性配置选项基于DLedger
- 多种跨语言客户端,如Java, C/ c++, Python, Go, Node.js
- 可插入传输协议,如TCP, SSL, AIO
- 内置消息跟踪功能,还支持开放跟踪
- 多功能大数据和流生态系统集成
- 按时间或偏移量计算的消息回溯性
- 在同一个队列中,可靠的FIFO和严格有序的消息传递
- 高效的拉推消费模式
- 单个队列中的百万级消息积累能力
- 多种消息传递协议,如JMS和OpenMessaging
- 灵活的分布式扩展部署架构
- 闪电般的批处理消息交换系统
- 各种消息过滤机制,如SQL和Tag
- Docker映像用于隔离测试和云隔离集群
- 功能丰富的管理仪表板,用于配置、度量和监视
- 身份验证和授权
- 用于源代码和接收器的免费开源连接器
1、概述
Apache RocketMQ是一个分布式消息和流平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可伸缩性。它由四个部分组成:name servers, brokers, producers and consumers.。它们中的每一个都可以水平扩展,没有单一的故障点。如下图所示:
1.1 NameServer Cluster
NameServer 提供轻量级的服务发现和路由。每个Name Server记录完整的路由信息,并提供相应的读写服务,支持快速的存储扩展。
1.2 Broker Cluster
Broker 通过提供轻量级的TOPIC和QUEUE机制来处理消息存储。它们支持Push和Pull模型,包含容错机制(2个副本或3个副本),并提供强大的峰值填充和按原始时间顺序积累数千亿消息的能力。此外,broker提供灾难恢复、丰富的度量统计和警报机制,这些都是传统消息传递系统所缺乏的。
1.3 Producer Cluster
Producer 支持分布式部署。分布式生产者通过多种负载均衡模式向代理集群发送消息。发送进程支持快速失败和低延迟。
1.4 Consumer Cluster
Consumer 也支持Push和Pull模型中的分布式部署。它还支持集群使用和消息广播。它提供了实时消息订阅机制,能够满足大多数用户的需求。RocketMQ的网站为感兴趣的用户提供了一个简单的快速入门指南。
2、NameServer
NameServer是一个功能齐全的服务器,主要包括两个特性:
- Broker 管理,NameServer接受来自 Broker 集群的注册,并提供心跳机制来检查Broker是否活着。
- 路由管理,每个NameServer将保存关于Broker集群的整个路由信息和供客户端查询的 queue (队列) 信息。
我们知道,RocketMQ客户端(生产者/消费者)将从NameServer查询队列路由信息,但客户端如何找到NameServer地址?
有四种方法提供NameServer地址列表给客户端:
- 程序化的方式,比如
producer.setNamesrvAddr("ip:port")
。 - Java选项,使用
rocketmq.namesrv.addr
。 - 环境变量,使用
NAMESRV_ADDR
。 - HTTP端点。
3、Broker Server
代理服务器负责消息存储和传递、消息查询、HA保证等。
如下图所示,代理服务器有几个重要的子模块:
- Remoting Module:远程模块处理来自客户端的请求。
- Client Manager:客户端管理,管理客户端(生产者/消费者)并维护消费者的主题订阅。
- Store Service:存储服务,提供简单的api在物理磁盘上存储或查询消息。
- HA Service:高可用服务,提供主代理和从代理之间的数据同步功能。
- Index Service:索引服务,根据指定的键为消息建立索引,并提供快速的消息查询。
4、Hello World
4.1 安装 RocketMQ 服务
点击 这里 下载 4.9.2 版本。
4.1.1 解压 RocketMQ 压缩包。
> unzip rocketmq-all-4.9.2-source-release.zip
> cd rocketmq-all-4.9.2/
4.1.2 Start Name Server
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
启动成功查看日志如下:
4.1.3 Start Broker
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log
启动成功查看日志如下:
4.2 Producer 端发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer
public static void main(String[] args) throws Exception
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++)
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
控制台打印如下:
4.3 Consumer 接收消息
public class Consumer
public static void main(String[] args) throws InterruptedException, MQClientException
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
控制台打印如下:
参考文章:
- https://github.com/apache/rocketmq
- https://rocketmq.apache.org/docs/rmq-arc/
- https://rocketmq.apache.org/docs/quick-start/
- https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
以上是关于1RocketMQ 源码解析之 Hello World的主要内容,如果未能解决你的问题,请参考以下文章
1Google Grpc 框架源码解析 之 Hello World