rocketMQ安装配置+与java交互API操作+集群搭建+高级特性
Posted 编程小栈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMQ安装配置+与java交互API操作+集群搭建+高级特性相关的知识,希望对你有一定的参考价值。
文章目录
一、RocketMQ
1. MQ概述
MQ全称 Message Queue(消息队列)
存储消息的中间件 是在 消息的传输过程中保存消息的容器。
多用于分布式系统之间进行通信。
消息队列是典型例子: 生产者消费者模型。
MQ消息队列, 存储消息的中间件
分布式系统通信方式:
直接远程调用
借助第三方完成间接通信
发送方称为生产者, 接收方称为消费者
MQ 的作用
- 应用解耦:
MQ中间件的加入,解开了两台服务器之间的强依赖 - 速应用变更维护 :
生成者将消息存放到了MQ中,消费者从MQ中获取消息进行消费.
可以根据消息的多少,在分配消费者服务器的数量 - 流量削锋(削峰填谷):
生成者将消息存放到了MQ中,消费者从MQ中获取消息进行消费.
消费者量力而行,在不宕机的情况下尽可能的消费消息
MQ的缺点
1.系统可用性降低: 集群
2.系统复杂度提高:(程序员提升水平)
3.异步消息机制(都有解决方案)
消息顺序性
消息丢失
消息一致性
消息重复使用
常见产品
ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
2. RocketMQ简介
RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,
后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力
(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)
Apache官网地址: https://www.apache.org/
rocketMQ下载地址: http://rocketmq.apache.org/dowloading/releases/
3. RocketMQ安装与启动
安装
注意: RocketMQ是使用java语言开发的,所以在安装RocketMQ前先安装JDK.
# 1、 Liunx安装JDK
a. 查看当前Linux系统是否已经安装java
rpm -qa | grep java
b. 将要安装的软件上传到linux服务器上
/software (该目录存放我们上传的软件压缩包)
c. 将软件安装到 /usr/local/jdk(jdk目录需要自己创建)
mkdir jdk
d. 将软件压缩包解压到 jdk目录下
(进入jdk目录)
tar -xvf /software/jdk....
# jdk软件已经安装完毕,接下来需要配置环境变量(修改配置文件)
e. 修改linux的配置文件 (/etc/profile)
vim /etc/profile
编辑 profile文件在文件的最下方添加:
export JAVA_HOME=/usr/local/jdk/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$PATH
f. 重写加载 profile 文件
source /etc/profile
g. 验证是否安装成功
java
java -version
# 2、安装RocketMQ
要求: 在Linux上必须有jdk环境(1.8以上)
# 先将RocketMQ压缩包上传到Linux服务器上
# 解压到根目录下(方便RocketMQ高级的集群配置)
unzip rocketmq-all-4.5.2-bin-release.zip
# 修改目录名称
mv rocketmq-all-4.5.2-bin-release rocketmq
------------------------
目录介绍:
benchmark: rocketmq测试目录,用于测试MQ
bin: 存放rocketmq可执行命令
mqnamesrv: 启动命名服务器
mqbroker: 启动代理服务器
conf: 存放rocketmq配置文件
lib: 存放依赖jar包
-------------------------
# 配置RocketMQ运行时占用的内存空间 改为下面
vim /rocketmq/bin/runbroker.sh
vim /rocketmq/bin/runserver.sh
# 开发环境配置 JVM Configuration
JAVA_OPT="$JAVA_OPT -server -Xms256m -Xmx256m -Xmn128m"
启动命名服务器
# 进入rocketMQ安装目录下的bin目录
# 启动nameserv
sh mqnamesrv
# 查询name server的状态
ps -ef | grep mqnamesrv
启动Broker
新开一个窗口,启动Broker代理服务器
# 进入rocketMQ安装目录下的bin目录
# 启动mq服务 -n 指定nameserv的地址
sh mqbroker -n localhost:9876
注意:首次启动会报错,因为broker默认占用内存过大,我们需要调整参数
====需修改mqbroker对应的runbroker.sh文件,将占用内存改小====
设置为256M
再次重启
# 重启mq服务 -n 指定nameserv的地址
sh mqbroker -n localhost:9876
测试
再另起一个窗口测试
# 关闭防火墙
systemctl stop firewalld.service
# 设置命名服务器位置
export NAMESRV_ADDR=localhost:9876
# 进入bin目录.执行测试程序 随时ctrl+c暂停
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 接收的消息如下
MessageExt [
queueId=1,
storeSize=179,
queueOffset=256,
sysFlag=0,
bornTimestamp=1616681027046,
bornHost=/192.168.190.143:48824,
storeTimestamp=1616681027048,
storeHost=/192.168.190.143:10911,
msgId=C0A8BE8F00002A9F000000000002D147,
commitLogOffset=184647,
bodyCRC=1392906658,
reconsumeTimes=0,
preparedTransactionOffset=0,
toString()=Message
topic='TopicTest',
flag=0,
properties=
MIN_OFFSET=0,
MAX_OFFSET=422,
CONSUME_START_TIME=1616761308096,
UNIQ_KEY=C0A8BE8F265163947C6B805495E6004E,
WAIT=true,
TAGS=TagA
,
body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56],
transactionId='null'
]
brokerName:broker名称
queueId:记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
storeSize:记录消息在Broker存盘大小
queueOffset:记录在ConsumeQueue中的偏移
sysFlag:记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
bornTimestamp:消息创建时间,在Producer发送消息时设置
bornHost:记录发送改消息的producer地址
storeTimestamp:消息存储时间
storeHost:记录存储该消息的Broker地址
msgId:消息Id
commitLogOffest:记录消息在Broker中存储偏移
bodyCRC:消息内容CRC校验值
reconsumeTimes:消息重试消费次数
body:Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
transactionId:事务消息相关的事务编号
preparedTransactionOffset:
message
topic:话题
flag:网络通信层标记
properties
MIN_OFFSET:最小偏移
MAX_OFFSET:最大偏移
CONSUME_START_TIME:消费拉取时间
CLUSTER:集群
TAGS:消息标签
UNIQ_KEY:
WAIT:
body: 消息内容
关闭服务
先关broker: sh mqshutdown broker
再关namesrv: sh mqshutdown namesrv
二、API应用
1. 单生产者单消费者
创建maven项目导入jar包坐标
<!-- 导入rocketMQ的jar包坐标 -->
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
</dependencies>
生产者端
package com.ahcfl.demo1_one2one;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* TODO:单生产,单消费
* 生产者代码
*/
public class ProducerDemo
public static void main(String[] args) throws Exception
//1.创建生产者对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设置命名服务器的路径
producer.setNamesrvAddr("192.168.190.129:9876");
//3.启动生产者对象
producer.start();
System.out.println("==========生产者启动了==============");
//4.创建消息(同步)
// 参数1: topic,主题
// 参数2: 存放的消息信息
Message msg = new Message("topic1","hello rocketMQ !".getBytes("utf-8"));
//5.发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
//6.关闭生产者客户端
//producer.shutdown();
消费者端
package com.ahcfl.demo1_one2one;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* TODO:单生产者,单消费者
* 消费者代码
*/
public class ConsumerDemo
public static void main(String[] args) throws Exception
//1.创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设置命名服务器的地址
consumer.setNamesrvAddr("192.168.190.129:9876");
//3.设置订阅的topic和小标记
consumer.subscribe("topic1","*");
//4.设置监听,当topic1下有内容时获取对应的消息
consumer.registerMessageListener(
// 设置同步消息监听
new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
for (MessageExt msg : msgs)
System.out.println("获取的消息为: "+msg);
System.out.println("消息ID为: "+msg.getMsgId());
System.out.println("队列ID为: "+msg.getQueueId());
System.out.println("消息内容: "+new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
//5.启动消费者
consumer.start();
System.out.println("==========消费者启动了============");
2. 单生产者多消费者
生产者
package com.ahcfl.demo2_one2many;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* 测试一个生产者多个消费者
* 编写生产者代码
*/
public class ProducerDemo
public static void main(String[] args) throws Exception
//1.创建生产者对象,用于生产消息
DefaultMQProducer producer = new DefaultMQProducer("group2");
//2.设置命名服务器地址和端口
producer.setNamesrvAddr("192.168.190.129:9876");
//3.启动生产者
producer.start();
//4.创建消息
for (int i = 1; i <=10 ; i++)
Message msg = new Message("topic2",("hello rocketMQ "+i).getBytes("utf-8"));
SendResult result = producer.send(msg);
System.out.println("生产消息返回值: "+result);
//5.关闭生产者
producer.shutdown();
消费者
负载均衡
多个消费者平均分配消息数量
package com.ahcfl.demo2_one2many;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* TODO:测试一个生产者多个消费者
* 编写消费者代码
* 测试时先启动多个消费者,再启动生产者
*/
public class ConsumerDemo
public static void main(String[] args) throws Exception
//1.创建消费者对象,用于消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
//2.设置命名服务器的地址和端口
consumer.setNamesrvAddr("192.168.190.129:9876");
//3.设置消费者订阅
consumer.subscribe("topic2","*");
// TODO: 设置消费模式
// MessageModel.CLUSTERING : 负载均衡模式,该模式是默认的
consumer.setMessageModel(MessageModel.CLUSTERING);
//4.设置监听,监听topic2主题下的所有消息
consumer.registerMessageListener(new MessageListenerConcurrently()
// 同步状态监听
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
for (MessageExt msg : msgs)
System.out.println("获取的消息为: "+new String(msg.getBody()));
// 返回状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
System.out.println("=========消费者1111111111启动了==========");
//5.启动消费者
consumer.start();
广播模式
package com.ahcfl.demo2_one2many;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* TODO:测试一个生产者多个消费者
* 编写消费者代码
* 测试时先启动多个消费者,再启动生产者
*/
public class ConsumerDemo
public static void main(String[] args) throws Exception
//1.创建消费者对象,用于消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
//2.设置命名服务器的地址和端口
consumer.setNamesrvAddr("192.168.190.129:9876");
//3.设置消费者订阅
consumer.subscribe("topic2","*");
// TODO: 设置消费模式
// MessageModel.CLUSTERING : 负载均衡模式,该模式是默认的
//consumer.setMessageModel(MessageModel.CLUSTERING);
// MessageModel.BROADCASTING: 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置监听,监听topic2主题下的所有消息
consumer.registerMessageListener(new MessageListenerConcurrently()
// 同步状态监听
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
for (MessageExt msg : msgs)
System.out.println("获取的消息为: "+new String(msg.getBody()));
// 返回状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
System.out.println("=========消费者222222222222启动了==========");
//5.启动消费者
consumer.start();
广播模式的现象
1) 如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次
2) 如果多个消费者先启动(广播模式),后发消息,才有广播的效果
结论:
必须先启动消费者再启动生产者才有广播的效果
3. 多生产者多消费者
生产者
package com.ahcfl.demo3_many2many;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* 测试一个生产者多个消费者
* 编写生产者代码
*/
public class ProducerDemo
public static void main(String[] args) throws Exception
//1.创建生产者对象,用于生产消息
DefaultMQProducer producer = new DefaultMQProducer("group3");
//2.设置命名服务器地址和端口
producer.setNamesrvAddr("192.168.190.129:9876");
//3.启动生产者
producer.start();
//4.创建消息
for (int i = 1; i <=20 ; i++)
Message msg = new Message("topic3",("hello rocketMQ "+i).getBytes("utf-8"));
SendResult result = producer.send(msg);
System.out.println("生产消息返回值: "+result);
//5.关闭生产者
producer.shutdown();
消费者
package com.ahcfl.demo3_many2many;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* TODO:测试一个生产者多个消费者
* 编写消费者代码
* 测试时先启动多个消费者,再启动生产者
*/
public class 安装配置RocketMQ