学相伴狂神说 RabbitMQ笔记(简单使用RabbitMQ)
Posted 冷血~多好
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学相伴狂神说 RabbitMQ笔记(简单使用RabbitMQ)相关的知识,希望对你有一定的参考价值。
目录
使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/details/119870837
什么是rabbitMQ
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,能够实现异步消息处理
RabbitMQ是一个消息代理:它接受和转发消息。
你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块
优点:异步消息处理
业务解耦(下订单操作:扣减库存、生成订单、发红包、发短信),将下单操作主流程:扣减库存、生成订单然后通过MQ消息队列完成通知,发红包、发短信
错峰流控 (通知量 消息量 订单量大的情况实现MQ消息队列机制,淡季情况下访问量会少)
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
RabbitMQ网站端口号:15672
程序里面实现的端口为:5672
使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/details/119870837
1.拉取RabbitMQ镜像
docker pull rabbitmq:management
2.运行RabbitMQ镜像
docker run -itd --name rabbit01 --hostname myrabbit -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management
注意:RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian
这里设置的是(RABBITMQ_DEFAULT_USER)登录的账号和( RABBITMQ_DEFAULT_PASS)密码,根据自身来修改
这里看到容器已经开启成功了,然后就可以使用了
3.通过浏览器打开
如果你使用的是本地虚拟机,那么你直接使用虚拟机显示的ipv4地址加端口号就可以访问了;
如果你使用的是云服务器,那么你需要在对应服务器(阿里云,腾讯云等)的安全组中开放15672
端口,并且在防火墙中也开放15672端口
显示如上图那么就可以开始使用了
然后通过命令进入rabbitmq容器
docker exec -it rabbit01 /bin/bash
授权账号和密码
rabbitmqctl add_user admin admin
设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator
用户级别:
-
administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
-
monitoring:监控者 登录控制台,查看所有信息
-
policymaker:策略制定者 登录控制台,指定策略
-
managment 普通管理员 登录控制台
为用户添加资源权限
rabbitmqctl set_permissions -p / admin ".*"".*"".*"
也可以在界面操作进行添加用户
RabbitMQ支持的消息模型
1.简单模式 Simple
2.工作模式 Work
3.发布订阅模式
4.路由模式
5.主题 Topic模式
6.参数模式
7.出版商确认模式
1.入门案例
1. RabbitMQ入门案例 - Simple 简单模式
jdk1.8
构建一个 maven工程
定义生产者
定义消费者
观察消息的在 rabbitmq-server服务中的进程
01 构建一个maven工程
02 导入依赖
<dependencies>
<!--导入rabbitmq的依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>
</dependencies>
3.代码编写
在上图的模型中,有以下概念:
生产者,也就是要发送消息的程序
消费者:消息的接受者,会一直等待消息到来。
消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者
package com.chen.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* @description: 简单模式Simple
*/
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1: 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("128.197.157.151");
connectionFactory.setPort(5672);
connectionFactory.setUsername("chenjinxian");//rabbitmq登录的账号
connectionFactory.setPassword("chenjinxian");//rabbitmq登录的密码
connectionFactory.setVirtualHost("/");
//springboot ---rabbitmq
Connection connection = null;
Channel channel = null;
try {
// 2: 创建连接Connection Rabbitmq为什么是基于channel去处理而不是链接? 长连接----信道channel
connection = connectionFactory.newConnection("生成者");
// 3: 通过连接获取通道Channel
channel = connection.createChannel();
// 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
String queueName = "queue1";
/*
* @params1 队列的名称
* @params2 是否要持久化durable=false 所谓持久化消息是否存盘,如果false 非持久化 true是持久化? 非持久化会存盘吗? 会存盘,但是会随从重启服务会丢失。
* @params3 排他性,是否是独占独立
* @params4 是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
* @params5 携带附属参数
*/
channel.queueDeclare(queueName, true, false, false, null);
// 5: 准备消息内容
String message = "Hello chenjinxian!!!";
// 6: 发送消息给队列queue
// @params1: 交换机 @params2 队列、路由key @params 消息的状态控制 @params4 消息主题
// 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功!!!");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
// 7: 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 8: 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1: 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("128.197.157.151");//服务器IP
connectionFactory.setPort(5672);
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2: 创建连接Connection
connection = connectionFactory.newConnection("消费者");
// 3: 通过连接获取通道Channel
channel = connection.createChannel();
// 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
// true = ack 正常的逻辑是没问题 死循环 rabbit 重发策略
// false = nack 消息这在消费消息的时候可能会异常和故障
final Channel channel2 = channel;
channel2.basicConsume("queue1", false, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
channel2.basicAck(message.getEnvelope().getDeliveryTag(),false);
}catch (Exception ex){
ex.printStackTrace();
// 三次确认 -- reject + sixin
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受失败了...");
}
});
System.out.println("开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
// 7: 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 8: 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
2. 什么是AMQP
01 什么是AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计
02 AMQP生产者流转过程
03 AMQP消费者流转过程
3. RabbitMQ的核心组成部分
01 RabbitMQ的核心组成部分
核心概念: 核心概念:
Server :又称Broker ,接受客户端的连接,实现AMQP实体服务。安装rabbitmq-serverConnection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手 服务器:又称Broker,接受客户端的连接,实现AMQP实体服务。安装Rabbitmq-serverConnection:连接,应用程序与Broker的网络连接tcp/ip/三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进 息读写的通道,客户端可以建立对恪Channel,每个Channel代表一个会话任务。 频道:网络信道,几乎所有的操作都在频道中进行频道,是进息读写的通道,客户端可以建立对恪频道频道,每个频道代表一个会话任务频道。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。 消息:消息:服务与应用程序之间传送的数据,由Properties和Body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange 虚拟主机虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange :交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)Bindings : Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key. 交换:交换机,接受消息,根据路由键发送消息到绑定的队列.(=不具备消息存储的能力==)绑定:Exchange和Queue之间的虚拟连接,Binding中可以保护多个路由密钥。
Routing key :是一个路由规则,虚拟机可以用它来确定如何路由一个特疋消恳.bttos:/bloq.csdn.net/qg _4485823(Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费苦。"gwa" 路由密钥:是一个路由规则,虚拟机可以用它来确定如何路由一个特征消息(队列:队列:也成为消息队列,消息队列,保存消息并将它们转发给消费者.
02 RabbitMQ整体架构是什么样子的?
03 RabbitMQ的运行流程
4. RabbitMQ入门案例 - fanout 模式
01 RabbitMQ的模式之发布订阅模式
发布订阅模式的具体实现
-
类型:fanout
-
特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
package com.chen.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
发布订阅模式的具体实现
类型:fanout
特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("128.156.157.161");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 准备发送消息的内容
String message = "hello xuexi!!!";
// 6:准备交换机
String exchangeName = "fanout_change";
// 8: 指定交换机的类型
String type = "fanout";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
// #.course.* queue3
// *.order.# queue2 ta
// com.order.course.xxx collecion
channel.basicPublish(exchangeName,"", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
发布订阅模式的具体实现
类型:fanout
特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
*/
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("128.156.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
new Thread(runnable, "queue4").start();
//new Thread(runnable, "queue5").start();
}
}
5. RabbitMQ入门案例 - Direct 模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
package com.chen.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
Direct 模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("128.176.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 准备发送消息的内容
String message = "hello direct_exchange!!!";
// 6:准备交换机
String exchangeName = "direct_exchange";
// 7: 定义路由key
String routeKey = "email";
// 8: 指定交换机的类型
String type = "direct";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
// #.course.* queue3
// *.order.# queue2 ta
// com.order.course.xxx collecion
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
Direct 模式
*/
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.156.147.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
new Thread(runnable, "queue4").start();
// new Thread(runnable, "queue5").start();
}
}
6. RabbitMQ入门案例 - Topic 模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
package com.chen.rabbitmq.topics;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
Topic模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("125.156.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 准备发送消息的内容
String message = "hello topic_exchange!!!";
// 6:准备交换机
String exchangeName = "topic_exchange";
// 7: 定义路由key
String routeKey = "com.order.user";
// 8: 指定交换机的类型
String type = "topic";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
// #.course.* queue3
// *.order.# queue2 ta
// com.order.course.xxx collecion
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者不变
完整案例(创建交换机,创建队列,交换机与队列绑定)
package com.chen.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
完整案例
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("151.156.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
String message = " 你好,小白";
// 交换机
String exchangeName = "direct_message_exchange";
// 交换机的类型 direct/topic/fanout/headers
String exchangeType = "direct";
// 如果你用界面把queueu 和 exchange的关系先绑定话,你代码就不需要在编写这些声明代码可以让代码变得更加简洁,但是不容读懂
// 如果用代码的方式去声明,我们要学习一下
// 7: 声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失,如果是true代表不丢失,false重启就会丢失
channel.exchangeDeclare(exchangeName,exchangeType,true);
// 8: 声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);
// 9:绑定队列和交换机的关系
channel.queueBind("queue5",exchangeName,"order");
channel.queueBind("queue6",exchangeName,"order");
channel.queueBind("queue7",exchangeName,"course");
channel.basicPublish(exchangeName, "course", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
执行完后生成队列和交换机
7. RabbitMQ入门案例 - Work模式
01 Work模式轮询模式(Round-Robin)
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
-
轮询模式的分发:一个消费者一条,按均分配
-
公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
01轮询模式
生产者
package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
轮询模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.156.147.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "学相伴:" + i;
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
轮询模式
*/
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.156.147.155");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
//finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
轮询模式
*/
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.195.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化以上是关于学相伴狂神说 RabbitMQ笔记(简单使用RabbitMQ)的主要内容,如果未能解决你的问题,请参考以下文章