rocketMQ安装配置+与java交互API操作+集群搭建+高级特性

Posted ahcfl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMQ安装配置+与java交互API操作+集群搭建+高级特性相关的知识,希望对你有一定的参考价值。

文章目录

一、RocketMQ

1. MQ概述

MQ全称 Message Queue(消息队列)

存储消息的中间件 是在 消息的传输过程中保存消息的容器。
多用于分布式系统之间进行通信。
消息队列是典型例子: 生产者消费者模型。

MQ消息队列, 存储消息的中间件

分布式系统通信方式:
直接远程调用
借助第三方完成间接通信
发送方称为生产者, 接收方称为消费者

MQ 的作用

  1. 应用解耦:
    MQ中间件的加入,解开了两台服务器之间的强依赖
  2. 速应用变更维护 :
    生成者将消息存放到了MQ中,消费者从MQ中获取消息进行消费.
    可以根据消息的多少,在分配消费者服务器的数量
  3. 流量削锋(削峰填谷):
    生成者将消息存放到了MQ中,消费者从MQ中获取消息进行消费.
    消费者量力而行,在不宕机的情况下尽可能的消费消息

MQ的缺点

1.系统可用性降低: 集群
2.系统复杂度提高:(程序员提升水平)
3.异步消息机制(都有解决方案)
消息顺序性
消息丢失
消息一致性
消息重复使用

常见产品

ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

image-20210615234600073

image-20210615234752072

2. RocketMQ简介

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,
后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力
(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

image-20210615235910627

Apache官网地址: https://www.apache.org/

rocketMQ下载地址: http://rocketmq.apache.org/dowloading/releases/

3. RocketMQ安装与启动

安装

注意: RocketMQ是使用java语言开发的,所以在安装RocketMQ前先安装JDK.

# 1、 Liunx安装JDK
a. 查看当前Linux系统是否已经安装java
	 rpm -qa | grep java
b. 将要安装的软件上传到linux服务器上
	/software (该目录存放我们上传的软件压缩包)
c. 将软件安装到 /usr/local/jdk(jdk目录需要自己创建)
	mkdir jdk
d. 将软件压缩包解压到 jdk目录下
	(进入jdk目录)
	tar -xvf /software/jdk....
# jdk软件已经安装完毕,接下来需要配置环境变量(修改配置文件)
e. 修改linux的配置文件 (/etc/profile)
	vim /etc/profile 
	编辑 profile文件在文件的最下方添加:
        export JAVA_HOME=/usr/local/jdk/jdk1.8.0_181 
        export PATH=$JAVA_HOME/bin:$PATH
f. 重写加载 profile 文件
	source /etc/profile
g. 验证是否安装成功
	java 
	java -version
	
# 2、安装RocketMQ
要求: 在Linux上必须有jdk环境(1.8以上)
# 先将RocketMQ压缩包上传到Linux服务器上
# 解压到根目录下(方便RocketMQ高级的集群配置)
unzip rocketmq-all-4.5.2-bin-release.zip
# 修改目录名称
mv rocketmq-all-4.5.2-bin-release rocketmq
------------------------
目录介绍:
    benchmark: rocketmq测试目录,用于测试MQ
    bin: 存放rocketmq可执行命令
        mqnamesrv: 启动命名服务器
        mqbroker: 启动代理服务器
    conf: 存放rocketmq配置文件
    lib: 存放依赖jar包
-------------------------
# 配置RocketMQ运行时占用的内存空间  改为下面
vim /rocketmq/bin/runbroker.sh
vim /rocketmq/bin/runserver.sh  
# 开发环境配置 JVM Configuration  
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

启动命名服务器

# 进入rocketMQ安装目录下的bin目录
# 启动nameserv
sh mqnamesrv 
# 查询name server的状态
ps -ef | grep mqnamesrv

启动Broker

新开一个窗口,启动Broker代理服务器

# 进入rocketMQ安装目录下的bin目录
# 启动mq服务 -n 指定nameserv的地址
sh mqbroker -n localhost:9876  

注意:首次启动会报错,因为broker默认占用内存过大,我们需要调整参数
====需修改mqbroker对应的runbroker.sh文件,将占用内存改小====
设置为256M

再次重启
# 重启mq服务 -n 指定nameserv的地址
sh mqbroker -n localhost:9876  
image-20210616001200038

测试

再另起一个窗口测试

# 关闭防火墙
systemctl stop firewalld.service 

# 设置命名服务器位置
export NAMESRV_ADDR=localhost:9876
# 进入bin目录.执行测试程序  随时ctrl+c暂停
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

# 接收的消息如下
MessageExt [
    queueId=1, 
    storeSize=179, 
    queueOffset=256, 
    sysFlag=0, 
    bornTimestamp=1616681027046, 
    bornHost=/192.168.190.143:48824, 
    storeTimestamp=1616681027048, 
    storeHost=/192.168.190.143:10911, 
    msgId=C0A8BE8F00002A9F000000000002D147, 
    commitLogOffset=184647, 
    bodyCRC=1392906658, 
    reconsumeTimes=0, 
    preparedTransactionOffset=0, 
    toString()=Message{
      topic='TopicTest', 
      flag=0, 
      properties={
          MIN_OFFSET=0, 
          MAX_OFFSET=422, 
          CONSUME_START_TIME=1616761308096, 
          UNIQ_KEY=C0A8BE8F265163947C6B805495E6004E, 
          WAIT=true, 
          TAGS=TagA
      }, 
      body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56], 
      transactionId='null'
	}
]

brokerName:broker名称
queueId:记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
storeSize:记录消息在Broker存盘大小
queueOffset:记录在ConsumeQueue中的偏移
sysFlag:记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
bornTimestamp:消息创建时间,在Producer发送消息时设置
bornHost:记录发送改消息的producer地址
storeTimestamp:消息存储时间
storeHost:记录存储该消息的Broker地址
msgId:消息Id
commitLogOffest:记录消息在Broker中存储偏移
bodyCRC:消息内容CRC校验值
reconsumeTimes:消息重试消费次数
body:Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
transactionId:事务消息相关的事务编号
preparedTransactionOffset:
message
	topic:话题
	flag:网络通信层标记
	properties
      MIN_OFFSET:最小偏移
      MAX_OFFSET:最大偏移
      CONSUME_START_TIME:消费拉取时间
      CLUSTER:集群
      TAGS:消息标签
      UNIQ_KEY:
      WAIT:
      body: 消息内容

关闭服务

先关broker: sh mqshutdown broker
再关namesrv: sh mqshutdown namesrv

二、API应用

1. 单生产者单消费者

创建maven项目导入jar包坐标

<!-- 导入rocketMQ的jar包坐标 -->
<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.5.2</version>
    </dependency>
</dependencies>

生产者端

package com.ahcfl.demo1_one2one;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * TODO:单生产,单消费
 * 生产者代码
 */
public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        //1.创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.设置命名服务器的路径
        producer.setNamesrvAddr("192.168.190.129:9876");
        //3.启动生产者对象
        producer.start();
        System.out.println("==========生产者启动了==============");
        //4.创建消息(同步)
        // 参数1: topic,主题
        // 参数2: 存放的消息信息
        Message msg = new Message("topic1","hello rocketMQ !".getBytes("utf-8"));
        //5.发送消息
        SendResult sendResult = producer.send(msg);
        System.out.println(sendResult);
        //6.关闭生产者客户端
        //producer.shutdown();
    }
}

消费者端

package com.ahcfl.demo1_one2one;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * TODO:单生产者,单消费者
 * 消费者代码
 */
public class ConsumerDemo {
    public static void main(String[] args) throws Exception {
        //1.创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设置命名服务器的地址
        consumer.setNamesrvAddr("192.168.190.129:9876");
        //3.设置订阅的topic和小标记
        consumer.subscribe("topic1","*");
        //4.设置监听,当topic1下有内容时获取对应的消息
        consumer.registerMessageListener(
                // 设置同步消息监听
                new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        for (MessageExt msg : msgs) {
                            System.out.println("获取的消息为: "+msg);
                            System.out.println("消息ID为: "+msg.getMsgId());
                            System.out.println("队列ID为: "+msg.getQueueId());
                            System.out.println("消息内容: "+new String(msg.getBody()));
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
        );
        //5.启动消费者
        consumer.start();
        System.out.println("==========消费者启动了============");
    }
}

2. 单生产者多消费者

生产者

package com.ahcfl.demo2_one2many;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * 测试一个生产者多个消费者
 * 编写生产者代码
 */
public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        //1.创建生产者对象,用于生产消息
        DefaultMQProducer producer = new DefaultMQProducer("group2");
        //2.设置命名服务器地址和端口
        producer.setNamesrvAddr("192.168.190.129:9876");
        //3.启动生产者
        producer.start();
        //4.创建消息
        for (int i = 1; i <=10 ; i++) {
            Message msg = new Message("topic2",("hello rocketMQ "+i).getBytes("utf-8"));
            SendResult result = producer.send(msg);
            System.out.println("生产消息返回值: "+result);
        }
        //5.关闭生产者
        producer.shutdown();
    }
}

消费者

负载均衡

多个消费者平均分配消息数量

package com.ahcfl.demo2_one2many;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * TODO:测试一个生产者多个消费者
 *      编写消费者代码
 *      测试时先启动多个消费者,再启动生产者
 */
public class ConsumerDemo {
    public static void main(String[] args) throws Exception {
        //1.创建消费者对象,用于消费消息
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
        //2.设置命名服务器的地址和端口
        consumer.setNamesrvAddr("192.168.190.129:9876");
        //3.设置消费者订阅
        consumer.subscribe("topic2","*");
        // TODO: 设置消费模式
        // MessageModel.CLUSTERING : 负载均衡模式,该模式是默认的
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //4.设置监听,监听topic2主题下的所有消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 同步状态监听
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("获取的消息为: "+new String(msg.getBody()));
                }
                // 返回状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        System.out.println("=========消费者1111111111启动了==========");
        //5.启动消费者
        consumer.start();
    }
}

广播模式
package com.ahcfl.demo2_one2many;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * TODO:测试一个生产者多个消费者
 *      编写消费者代码
 *      测试时先启动多个消费者,再启动生产者
 */
public class ConsumerDemo {
    public static void main(String[] args) throws Exception {
        //1.创建消费者对象,用于消费消息
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
        //2.设置命名服务器的地址和端口
        consumer.setNamesrvAddr("192.168.190.129:9876");
        //3.设置消费者订阅
        consumer.subscribe("topic2","*");
        // TODO: 设置消费模式
        // MessageModel.CLUSTERING : 负载均衡模式,该模式是默认的
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        // MessageModel.BROADCASTING: 广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.设置监听,监听topic2主题下的所有消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 同步状态监听
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("获取的消息为: "+new String(msg.getBody()));
                }
                // 返回状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        System.out.println("=========消费者222222222222启动了==========");
        //5.启动消费者
        consumer.start();
    }
}

广播模式的现象
  1) 如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次
  2) 如果多个消费者先启动(广播模式),后发消息,才有广播的效果
结论:
 	 必须先启动消费者再启动生产者才有广播的效果

3. 多生产者多消费者

生产者

package com.ahcfl.demo3_many2many;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * 测试一个生产者多个消费者
 * 编写生产者代码
 */
public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        //1.创建生产者对象,用于生产消息
        DefaultMQProducer producer = new DefaultMQProducer("group3");
        //2.设置命名服务器地址和端口
        producer.setNamesrvAddr("192.168.190.129:9876");
        //3.启动生产者
        producer.start();
        //4.创建消息
        for (int i = 1; i <=20 ; i++) {
            Message msg = new Message("topic3",("hello rocketMQ "+i).getBytes("utf-8"));
            SendResult result = producer.send(msg);
            System.out.println("生产消息返回值: "+result);
        }
        //5.关闭生产者
        producer.shutdown();
    }
}

消费者

package com.ahcfl.demo3_many2many;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * TODO:测试一个生产者

以上是关于rocketMQ安装配置+与java交互API操作+集群搭建+高级特性的主要内容,如果未能解决你的问题,请参考以下文章

安装配置RocketMQ

安装配置RocketMQ

安装配置RocketMQ

rocketmq 安装与配置以及遇到的问题

rocketmq安装与基本操作

RocketMQ