RabbitMq入门案例
Posted 简讯_A
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq入门案例相关的知识,希望对你有一定的参考价值。
Ribbitmq概括
概念
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
看场景理解mq
如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度
Windows环境安装
rabbitMQ是Erlang语言开发的所以先下载
双击安装完成后
-
配置环境变量
系统变量 ERLANG_HOME D:\\java\\erl-24.0 环境变量 %ERLANG_HOME%\\bin
-
windows打开cmd控制台输入cmd,测试输入erl出现一下内容
Eshell V12.0 (abort with ^G) 1>
-
下载RabbitMQ
Rabbitmq: https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.4/rabbitmq-server-3.9.4.exe
-
双击安装
安装完成后,开始安装RabbitMQ-Plugins插件
先cd D:\\java\\RabbitMQ Server\\rabbitmq_server-3.9.4\\sbin
然后运行命令:rabbitmq-plugins enable rabbitmq_management
出现一下画面成功
Enabling plugins on node rabbit@WNDN-750: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@WNDN-750... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch started 3 plugins.
-
执行rabbitmqctl status,出现以下内容,说明成功
Status of node rabbit@WNDN-750 ... [1mRuntime[0m OS PID: 14016 OS: Windows Uptime (seconds): 185 Is under maintenance?: false RabbitMQ version: 3.9.4 Node name: rabbit@WNDN-750 Erlang configuration: Erlang/OTP 24 [erts-12.0] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [jit] Erlang processes: 402 used, 1048576 limit Scheduler run queue: 1 Cluster heartbeat timeout (net_ticktime): 60 [1mPlugins[0m ...............
-
运行 D:\\java\\RabbitMQ Server\\rabbitmq_server-3.9.4\\sbin\\rabbitmq-server.bat
等几秒钟,在浏览器访问http://localhost:15672/
successful......
Linux环境安装
Docker installation
(1)yum 包更新到最新 > yum update (2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的 > yum install -y yum-utils device-mapper-persistent-data lvm2 (3)设置yum源为阿里云 > yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo (4)安装docker > yum install docker-ce -y (5)安装后查看docker版本 > docker -v (6) 安装加速镜像 sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-\'EOF\' "registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"] EOF sudo systemctl daemon-reload sudo systemctl restart docker (7) 获取rabbit镜像: > docker pull rabbitmq:management (8)创建并运行容器 > docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management (9)查看日志 > docker logs -f myrabbit (10)查看服务 > docker ps -a (11)关闭容器 > docker be9df4f0292e stop
Other commands
# 启动docker: systemctl start docker # 停止docker: systemctl stop docker # 重启docker: systemctl restart docker # 查看docker状态: systemctl status docker # 开机启动: systemctl enable docker systemctl unenable docker # 查看docker概要信息 docker info # 查看docker帮助文档 docker --help
rabbitmq修改密码
-
在所有应用中找到rabbitMQ command promot程序并单击单开。
-
在打开的命令窗口中输入rabbitmqctl add_user test 123456后回车,test为新增登录账户,123456为账户密码
-
然后再敲入rabbitmqctl set_user_tags test administrator后回车。
-
再给test账户设置 操作主机的权限。敲入rabbitmqctl set_permissions -p / test "." "." ".*"
-
回到登录页面,用账户名为test,密码为123456进行登录就ok了。
Ribbitmq队列
消息队列协议
AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。 特性: 1:分布式事务支持。 2:消息的持久化支持。 3:高性能和高可靠的消息处理优势。
面试题:为什么ribbitmq不使用http协议
-
因为http协议请求头很复杂,包含了cookies,数据的加密解密,状态码等附加功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,他其实就负责数据传递,存储,分发就可以了,一定要追求的是高性能,尽量简洁,快速
-
大部分情况下http都是短连接,在交互过程中可能因为服务器宕机中断以后就不会进行持久化,就会会照成请求的丢失,这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期获取消息得过程,出现问题和故障要对数据或消息进行持久化等,目的就是为了保障数据得高可靠和稳健的运行
消息队列持久化
RabbitMQ在两种情况下会将消息写入磁盘:
-
消息本身在
publish
的时候就要求消息写入磁盘; -
内存紧张
需要将部分内存中的消息转移到磁盘;
消息队列消费策略
MQ消息队列有如下几个角色 1:生产者 2:存储消息 3:消费者 那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。
场景1
比如我在APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被那个系统或者那些服务或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论。
场景2
在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发。
消息队列高可用和高可靠
什么是高可用机制
高可用是指产品在规定的条件和规定的时刻或者时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件的服务器会触及硬件(CPU、内存、磁盘)的极限,一台消息中 间件的服务器已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的。
什么是高可靠
在高并发应用场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
保证中间件消息的可靠性尼?可从两个方面考虑
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2、消息的存储可靠:通过持久化来保证消息的可靠性。
反正终归三句话: 1:要么消息共享, 2:要么消息同步 3:要么元数据共享
五种工作模式
简单模式
结构:
生产者:生成消息,发送到交换机
交换机:根据消息属性,将消息发送给队列(如果没有声明交换机,则使用默认交换机)
消费者:监听这个队列,发现消息后,获取消息执行消费逻辑
应用场景:
常见的应用场景就是一发,一接的结构
例如:
手机短信,邮件单发
代码测试
package cn.tedu.test.rabbit; import com.rabbitmq.client.*; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 完成简单模式一发一接的结构 */ public class SimpleMode //初始化连接对象 短连接 private Channel channel; @Before public void channelInit() throws IOException, TimeoutException //ip:port tedu/123456 /* 1.长链接工厂,提供4个属性 ip port tedu 123456 2.获取长连接 3.给成员变量channel赋值 */ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); channel=connection.createChannel(); //测试包含3个方法 //声明组件,交换机和队列,简单模式案例,交换机使用默认交换机.队列需要声明 @Test public void myQueueDeclare() throws IOException //调用channel的方法,声明队列 channel.queueDeclare( "simple",//设置路由key false,//boolean类型,队列是否持久化 false,//boolean类型,队列是否专属, // 只有创建声明队列的连接没有断开,队列才可用 false,//boolean类型,队列是否自动删除.从第一个消费端监听队列开始 //计算,直到最后一个消费端断开连接,队列就会自动删除 null);//map类型,key值是固定一批属性 System.out.println("队列声明成功"); //发送消息到队列 生产端,永远不会吧消息直接发给队列,发给交换机 //目前可以使用7个交换机来接收消息 @Test public void send() throws IOException //准备个消息 发送的是byte[] String msg="宝贝一到手,风紧扯呼"; byte[] msgByte=msg.getBytes(); //将消息发给(AMQP DEFAULT)交换机 名字"" channel.basicPublish( "",//发送给的交换机的名字,默认为空 "simple",//路由key,你想让交换机把消息传递给哪个队列的名称 null,//发送消息时,携带的头,属性等.例如 // app_id content-type priority优先级 msgByte//消息体 ); //消费端 @Test public void consume() throws IOException //消费消息 channel.basicConsume("simple", false, new DeliverCallback() /**传递回调对象. 消息就在这个对象里 * @param consumerTag 当前消费端id * @param message 封装了消息的对象 * @throws IOException */ @Override public void handle(String consumerTag, Delivery message) throws IOException //从消息对象中拿到信息 byte[] body = message.getBody(); System.out.println(new String(body)); //如果autoAck false说明消费完消息,需要手动确认 channel.basicAck( message.getEnvelope().getDeliveryTag(), false); , new CancelCallback() /** * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用 * @param consumerTag 消费端id * @throws IOException */ @Override public void handle(String consumerTag) throws IOException ); //使用while true 将线程卡死,否则看不到消息消费逻辑 while(true);
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式: 1、轮询模式的分发:一个消费者一条,按均分配; 2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
默认轮询,以下为
package com.xuexiangban.rabbitmq.work.lunxun; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer pub try // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 //===============================end topic模式================================== for (int i = 1; i <= 20; i++) //消息的内容 String msg = "学相伴:" + i; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, msg.getBytes()); System.out.println("消息发送成功!");
消费者1的逻辑
Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue1", true, new DeliverCallback() @Override public void handle(String s, Delivery delivery) throws IOException try System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(2000); catch(Exception ex) ex.printStackTrace(); , new CancelCallback() @Override public void handle(String s) throws IOException );
消费者2的逻辑
Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue1", true, new DeliverCallback() @Override public void handle(String s, Delivery delivery) throws IOException try System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(200); catch(Exception ex) ex.printStackTrace(); , new CancelCallback() @Override public void handle(String s) throws IOException );
工作争抢
结构
生产者:发送消息到交换机
交换机:根据消息属性将消息发送给队列
消费者:多个消费者,同时绑定监听一个队列,之间形成了争抢消息的效果
应用场景
抢红包
资源分配
代码实现
package cn.tedu.test.rabbit; import com.rabbitmq.client.*; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 完成一发多抢的结构 */ public class WorkMode private Channel channel; @Before public void channelInit() throws IOException, TimeoutException ConnectionFactory factory=new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); channel=connection.createChannel(); @Test public void myQueueDeclare() throws IOException //调用channel的方法,声明队列 channel.queueDeclare( "work", false, false, false, null); System.out.println("队列声明成功"); @Test public void send() throws IOException //准备个消息 发送的是byte[] String msg="宝贝一到手,风紧扯呼1111"; byte[] msgByte=msg.getBytes(); //将消息发给(AMQP DEFAULT)交换机 名字"" channel.basicPublish( "",//发送给的交换机的名字 "work",//路由key,你想让交换机把消息传递给哪个队列的名称 null,//发送消息时,携带的头,属性等.例如 // app_id content-type priority优先级 msgByte//消息体 ); //消费端 @Test public void consume01() throws IOException //消费消息 channel.basicConsume("work", false, new DeliverCallback() /**传递回调对象. 消息就在这个对象里 * @param consumerTag 当前消费端id * @param message 封装了消息的对象 * @throws IOException */ @Override public void handle(String consumerTag, Delivery message) throws IOException //从消息对象中拿到信息 byte[] body = message.getBody(); System.out.println("消费者01:"+new String(body)); //如果autoAck false说明消费完消息,需要手动确认 channel.basicAck( message.getEnvelope().getDeliveryTag(), false); , new CancelCallback() /** * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用 * @param consumerTag 消费端id * @throws IOException */ @Override public void handle(String consumerTag) throws IOException ); //使用while true 将线程卡死,否则看不到消息消费逻辑 while(true); @Test public void consume02() throws IOException //消费消息 channel.basicConsume("work", false, new DeliverCallback() /**传递回调对象. 消息就在这个对象里 * @param consumerTag 当前消费端id * @param message 封装了消息的对象 * @throws IOException */ @Override public void handle(String consumerTag, Delivery message) throws IOException //从消息对象中拿到信息 byte[] body = message.getBody(); System.out.println("消费者02:"+new String(body)); //如果autoAck false说明消费完消息,需要手动确认 channel.basicAck( message.getEnvelope().getDeliveryTag(), false); , new CancelCallback() /** * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用 * @param consumerTag 消费端id * @throws IOException */ @Override public void handle(String consumerTag) throws IOException ); //使用while true 将线程卡死,否则看不到消息消费逻辑 while(true);
路由模式
结构
生产端:发送的消息携带具体的路由key值
交换机:接收路由key值,判断和当前交换机绑定后端队列哪个满足路由的匹配将消息发送给这个队列
应用场景
处理一些特殊的消息逻辑,可以经过路由的筛选
代码测试
package cn.tedu.test.rabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 路由模式 */ public class DirectMode //初始化连接 private Channel channel; @Before public void channelInit() throws IOException, TimeoutException //ip:port tedu/123456 /* 1.长链接工厂,提供4个属性 ip port tedu 123456 2.获取长连接 3.给成员变量channel赋值 */ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); channel=connection.createChannel(); //准备交换机,队列的名称属性 private static final String TYPE="direct"; private static final String EX_NAME=TYPE+"_ex";//fanout_ex private static final String QUEUE01=TYPE+"_Q1"; private static final String QUEUE02=TYPE+"_Q2"; //声明三个组件 一个交换机 2个队列 @Test public void declare() throws IOException //声明队列 channel.queueDeclare(QUEUE01,false,false,false,null); channel.queueDeclare(QUEUE02,false,false,false,null); //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系 //声明交换机 channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机 //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意 channel.queueBind(QUEUE01,EX_NAME,"北京"); channel.queueBind(QUEUE01,EX_NAME,"广州"); channel.queueBind(QUEUE02,EX_NAME,"上海"); @Test public void send() throws IOException String msg="你好,路由模式交换机"; byte[] bytes = msg.getBytes(); channel.basicPublish(EX_NAME,"北京",null,bytes);
发布订阅
结构
生产端:发送消息到交换机
交换机:由于是发布订阅模式,会将这个消息发送同步到后端所有与其绑定的队列
消息端:简单模式 1个队列绑定一个消费者 争抢模式 1个队列绑定多个消费者
应用场景
邮件的群发,广告的群发
代码测试
package cn.tedu.test.rabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 一发多接的队列结构 */ public class FanoutMode //初始化连接 private Channel channel; @Before public void channelInit() throws IOException, TimeoutException //ip:port tedu/123456 /* 1.长链接工厂,提供4个属性 ip port tedu 123456 2.获取长连接 3.给成员变量channel赋值 */ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(15672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); channel=connection.createChannel(); //准备交换机,队列的名称属性 private static final String TYPE="fanout"; private static final String EX_NAME=TYPE+"_ex";//fanout_ex private static final String QUEUE01=TYPE+"_Q1"; private static final String QUEUE02=TYPE+"_Q2"; //声明三个组件 一个交换机 2个队列 @Test public void declare() throws IOException //声明队列 channel.queueDeclare(QUEUE01,false,false,false,null); channel.queueDeclare(QUEUE02,false,false,false,null); //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系 //声明交换机 channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机 //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意 channel.queueBind(QUEUE01,EX_NAME,""); channel.queueBind(QUEUE02,EX_NAME,""); @Test public void send() throws IOException String msg="你好,发布订阅模式"; byte[] bytes = msg.getBytes(); channel.basicPublish(EX_NAME,"北京",null,bytes);
主题模式
结构
交换机绑定队列,不在使用具体的路由key,可以使用符号代替路由key值的规则
#:表示任意多级的任意长度的字符串
*:任意长度字符串,但是只有一级
中国.北京.朝阳.望京.葫芦村
匹配到 中国.# 匹配到 中国.上海.# 匹配到 中国.*.*.* 匹配到 中国.*.朝阳.*.*
应用场景
实现多级传递的路由筛选工作,记录trace过程.
代码测试
package cn.tedu.test.rabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 主题模式 */ public class TopicMode //初始化连接 private Channel channel; @Before public void channelInit() throws IOException, TimeoutException //ip:port tedu/123456 /* 1.长链接工厂,提供4个属性 ip port tedu 123456 2.获取长连接 3.给成员变量channel赋值 */ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.91.151"); factory.setPort(5672); factory.setUsername("tedu"); factory.setPassword("123456"); Connection connection = factory.newConnection(); channel=connection.createChannel(); //准备交换机,队列的名称属性 private static final String TYPE="topic"; private static final String EX_NAME=TYPE+"_ex";//fanout_ex private static final String QUEUE01=TYPE+"_Q1"; private static final String QUEUE02=TYPE+"_Q2"; //声明三个组件 一个交换机 2个队列 @Test public void declare() throws IOException //声明队列 channel.queueDeclare(QUEUE01,false,false,false,null); channel.queueDeclare(QUEUE02,false,false,false,null); //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系 //声明交换机 channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机 //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意 channel.queueBind(QUEUE01,EX_NAME,"中国.北京.#"); channel.queueBind(QUEUE01,EX_NAME,"中国.*.*.*.*"); channel.queueBind(QUEUE02,EX_NAME,"*.上海.#"); @Test public void send() throws IOException String msg="你好,路由模式交换机"; byte[] bytes = msg.getBytes(); channel.basicPublish(EX_NAME,"中国.北京.大兴.亦庄",null,bytes);
SpringBoot整合rabbitmq
Fanout
使用springboot完成rabbitmq的消费-Fanout
整合业务逻辑图
实现步骤
-
创建Spring Initinlizr项目 --producer
勾选web+rabbitmq组件,编写yml配置
编写下单业务逻辑接口
@Component public class OrderService @Autowired private RabbitTemplate rabbitTemplate; private String exchangeName = "fanout_order_exchange"; private String routeKey = ""; public void makeOrder(Long userId, Long productId, int num) // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
创建配置类完成队列和交换机,并完成绑定
@Configuration public class DirectRabbitConfig @Bean public Queue emailQueue() return new Queue("email.fanout.queue", true); @Bean public Queue smsQueue() return new Queue("sms.fanout.queue", true); @Bean public Queue weixinQueue() return new Queue("weixin.fanout.queue", true); @Bean public DirectExchange fanoutOrderExchange() return new DirectExchange("fanout_order_exchange", true, false); @Bean public Binding bindingDirect1() return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with(""); @Bean public Binding bindingDirect2() return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with(""); @Bean public Binding bindingDirect3() return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
编写一个发送消息的测试类
@SpringBootTest class SpringBootOrderRabbitmqProducerApplicationTests @Autowired OrderService orderService; @Test void contextLoads() throws InterruptedException for (int i = 0; i < 10; i++) Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 10; orderService.makeOrder(userId, productId, num);
-
创建Spring Initinlizr项目 --consumer
编写一个监听消息队列的接口,监视指定队列,并消费消息
@RabbitListener(queues = "weixin.fanout.queue") @Component public class EmailController @RabbitHandler public void messagerevice(String msg) System.out.println("邮件发送消息:"+msg);
@RabbitListener(queues = "sms.fanout.queue") @Component public class SMSController @RabbitHandler public void smsrevice(String msg) System.out.println("sms发送消息:"+msg);
@RabbitListener(queues = "weixin.fanout.queue") @Component public class WechatController @RabbitHandler public void messagerevice(String msg) System.out.println("微信发送消息:"+msg);
Direct
direct和fanout模式的区别
定义交换机的名字不同
绑定关系时添加了路由key
pull消息到queue时,指定了路由key
实现逻辑:
-
创建Spring Initinlizr项目 --comsumer
勾选web+rabbitmq组件,编写yml配置
编写下单业务逻辑接口
public void makeOrderDirect(String userId, String productId, int num) private String routeKey1 = "sms"; private String routeKey2 = "email"; private String DirectExchangeName = "direct_order_exchange"; // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(DirectExchangeName, routeKey1, orderNumer); rabbitTemplate.convertAndSend(DirectExchangeName, routeKey2, orderNumer);
创建配置类完成队列和交换机,并完成绑定
@Configuration public class DirectRabbitConfig @Bean public Queue emailQueueDirect() return new Queue("email.direct.queue", true); @Bean public Queue smsQueueDirect() return new Queue("sms.direct.queue", true); @Bean public Queue weixinQueueDirect() return new Queue("weixin.direct.queue", true); @Bean //区别1 public DirectExchange directOrderExchange() return new DirectExchange("direct_order_exchange", true, false); @Bean public Binding bindingDirect1Direct() return BindingBuilder.bind(weixinQueueDirect()).to(directOrderExchange()).with("weixin");//区别2 @Bean public Binding bindingDirect2Direct() return BindingBuilder.bind(smsQueueDirect()).to(directOrderExchange()).with("sms"); @Bean public Binding bindingDirect3Direct() return BindingBuilder.bind(emailQueueDirect()).to(directOrderExchange()).with("email");
编写一个发送消息的测试类
@Test void contextLoads1() throws InterruptedException orderService.makeOrderDirect("1","1",12); @Test void contextLoads2() throws InterruptedException orderService.makeOrderDirect("1","1",12);
-
创建Spring Initinlizr项目 --consumer
编写一个监听消息队列的接口,监视指定队列,并消费消息
@RabbitListener(queues = "weixin.direct.queue") @Component public class EmailControllerDirect @RabbitHandler public void emailDirectRevice(String msg) System.out.println("direct->>>>>>>>>邮件发送消息:"+msg);
@RabbitListener(queues = "sms.direct.queue") @Component public class SMSControllerDirect @RabbitHandler public void smsDirectRevice(String msg) System.out.println("direct->>>>>>>>>sms发送消息:"+msg);
@RabbitListener(queues = "weixin.direct.queue") @Component public class WechatControllerDirect @RabbitHandler public void emailDirectRevice(String msg) System.out.println("direct->>>>>>>>>微信发送消息:"+msg);
Topic
topic和direct区别
发送消息根据模糊路由匹配
没有定义配置类,绑定逻辑通过注解编写在消费端
实现逻辑
-
创建Spring Initinlizr项目 --comsumer
勾选web+rabbitmq组件,编写yml配置
编写下单业务逻辑接口
public void makeOrderTopic(String userId, String productId, int num) // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); /** * *.email.# * #.sms.# * com.# */ String topicExchangeName = "topic_order_exchange"; String routeKey = "com"; //输出:topic ->>>>>>>>>微信发送消息:7aefec2c-60da-404c-ba71-cce63839c74f // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(topicExchangeName, routeKey, orderNumer);
编写一个发送消息的测试类
@Test void contextLoads2Topic() throws InterruptedException orderService.makeOrderTopic("1","1",12);
-
创建Spring Initinlizr项目 --consumer
编写一个监听消息队列的接口,监视指定队列,并消费消息
@RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "email.topic.queue",autoDelete = "false",durable = "true"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "topic_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.TOPIC),key = "*.email.#" )) @Component public class EmailControllerTopic @RabbitHandler public void emailTopicRevice(String msg) System.out.println("topic->>>>>>>>>邮件发送消息:"+msg);
@RabbitListener(bindings =@QueueBinding( value = @Queue(value = "sms.topic.queue",autoDelete = "false",durable = "true"), exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),key = "#.sms.#" )) @Component public class SMSControllerTopic @RabbitHandler public void smsTopicRevice(String msg) System.out.println("topic->>>>>>>>>sms发送消息:"+msg);
@RabbitListener(bindings =@QueueBinding( value = @Queue(value = "weixin.topic.queue",autoDelete = "false",durable = "true"), exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),key = "com.#" )) @Component public class WechatControllerTopic @RabbitHandler public void emailTopicRevice(String msg) System.out.println("topic ->>>>>>>>>微信发送消息:"+msg);
ttl过期时间
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
-
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
-
第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
实现设置队列过期时间
配置类
@Configuration public class ttlRabbitmqConfig @Bean public Queue queue1() Map<String,Object> args = new HashMap<>(); args.put("x-message-ttl",5000); return new Queue("ttl.queue",true,false,false,args); @Bean public DirectExchange ttlExchange() return new DirectExchange("ttl_order_exchange", true, false); @Bean public Binding bindingExchange() return BindingBuilder.bind(queue1()).to(ttlExchange()).with("ttl");
业务层
public void makeOrderTtl(String userId, String productId, int num) String orderNumer = UUID.randomUUID().toString(); System.out.println("user:"+orderNumer); String routeKey = "ttl"; // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(ttlExchangeName, routeKey, orderNumer);
测试类
@Test void contextLoads2Ttl() throws InterruptedException orderService.makeOrderTtl("1","1",12);
消费者监视类
@RabbitListener(queues = "ttl.queue") @Component public class ttlController @RabbitHandler public void ttlRevice(String msg) System.out.println("ttl -->>>>>>>邮件发送消息:"+msg);
实现设置消息过期机制
producer代码
@Bean public Queue messageQueue() return new Queue("message.queue",true); @Bean public DirectExchange messageOrderExchange() // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("message_order_exchange", true, false); @Bean public Binding bindingMessage() return BindingBuilder.bind(messageQueue()).to(messageOrderExchange()).with("message");
producer发送消息代码
public void ttlOrder(String userId, String productId, int num) String exchangeName = "message_order_exchange"; String routeKey = "message"; String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() @Override public Message postProcessMessage(Message message) throws AmqpException message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("utf-8"); return message; ; rabbitTemplate.convertAndSend(exchangeName, routeKey,userId, messagePostProcessor);
consumer消费者监听方法
@RabbitListener(queues = "message.queue") @Component public class messageController @RabbitHandler public void messageRevice(String msg) System.out.println("message->>>>消费消息");
生产者测试类发送消息
@Test void contextLoads1() orderService.ttlOrder("1"," 1",12);
死信队列案例
概念
当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。
消息被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度 当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。
生产者配置类
@Bean public Queue queue1() //做了参数的变更和消费不会失败,会报错 Map<String,Object> args = new HashMap<>(); args.put("x-message-ttl",5000); args.put("x-dead-letter-exchange","dead_direct_exchange"); args.put("x-dead-letter-routing-key","dead"); return new Queue("ttl.queue",true,false,false,args); @Bean public DirectExchange ttlExchange() return new DirectExchange("ttl_order_exchange", true, false); @Bean public Binding bindingExchange() return BindingBuilder.bind(queue1()).to(ttlExchange()).with("ttl");
生产者发送消息业务
//队列过期 public void makeOrderTtl(String userId, String productId, int num) String ttlExchangeName = "ttl_order_exchange"; String routeKey = "ttl"; String orderNumer = UUID.randomUUID().toString(); System.out.println("user:"+orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(ttlExchangeName, routeKey, orderNumer);
生产者发送消息测试类
@Autowired private TtlService ttlService; @Test void contextLoads2Ttl() throws InterruptedException ttlService.makeOrderTtl("1","1",12);
消费者
@RabbitListener(queues = "ttl.queue") public class TtlController @RabbitHandler public void ttlRevice(String msg) System.out.println("ttl -->>>>>>>邮件发送消息:"+msg);
Rabbitmq分布式事务
美团业务架构图
系统间调用过程中事务回滚问题
订单服务
系统结构
order-service
entity: OrderDataBaseService pojo: Order mapper: OrderMapper service: OrderService test: OrderServiceApplicationTests
sql脚本
CREATE TABLE `order_service` ( `order_id` int(50) DEFAULT NULL, `user_id` int(50) DEFAULT NULL, `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL, `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
pom.xml
srpingboot-web+rabbitmq+mybatis+jdbc+mysql+org.apache.common+com.fasterxml.jackson.dataformat
application.properties
server.port=8082 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/cn_tedu_order?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true spring.datasource.username=root spring.datasource.password=root spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ mybatis.mapper-locations=classpath:mapper/*.xml mybatis.type-aliases-package=cn.tedu.orderservice.pojo # Mybatis显示sql语句输出的配置 logging.level.cn.tedu.mybatis.mapper=TRACE
order-service
entity: OrderDataBaseService
@Component public class OrderDataBaseService @Autowired private OrderMapper orderMapper; public int saveOrder(Order orderInfo) int i = orderMapper.saveOrder(orderInfo); return i;
mapper: OrderMapper
@Repository public interface OrderMapper @Insert("insert into order_service values(#orderId,#userId,#orderContent,#createTime)") int saveOrder(Order order);
pojo: Order
@Lombok public class Order private int orderId; private int userId; private String orderContent; private String createTime;
service: OrderService
@Service public class OrderService @Autowired private OrderDataBaseService orderDataBaseService; // 创建订单 @Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务 public void createOrder(Order orderInfo) throws Exception // 1: 订单信息--插入丁订单系统,订单数据库事务 int i = orderDataBaseService.saveOrder(orderInfo); // 2:通過Http接口发送订单信息到运单系统 int id = orderInfo.getOrderId(); System.out.println("id:"+id); String result = dispatchHttpApi(id); if(!"User added successfully".equals(result)) throw new Exception("订单创建失败,原因是运单接口调用失败!"); /** * 模拟http请求接口发送,运单系统,将订单号传过去 springcloud * @return */ private String dispatchHttpApi(int orderId) /** * 情况1: 关闭远程服务:ConnectException: Connection refused: connect * */ SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); // 链接超时 > 3秒 factory.setConnectTimeout(3000); // 处理超时 > 2秒 factory.setReadTimeout(2000); // 发送http请求 String url = "http://localhost:8081/dispatcher/order?orderId="+orderId; RestTemplate restTemplate = new RestTemplate(factory);//异常 String result = restTemplate.getForObject(url, String.class); return result;
test: OrderServiceApplicationTests
@SpringBootTest class OrderServiceApplicationTests @Autowired private OrderService orderService; @Test void actionTest1() throws Exception SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String dateNow = sdf.format(new Date()); orderService.createOrder(new Order(0,0,UUID.randomUUID().toString(),dateNow));
配送中心
系统结构
dispacher-service
mapper: DispacherDao pojo: Dispacher service: DispacherService web: DispacherController
sql文件
CREATE TABLE `dispacher_service` ( `dispacher_id` int(50) DEFAULT NULL, `order_id` int(50) DEFAULT NULL, `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL, `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL, `user_id` int(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8 _bin;
mapper: DispacherDao
@Repository public interface DispatcherDao // @Insert("insert into dispacher_service values(#dispacherId,#orderId,#orderContent,#createTime,#userId)") int insertUser(Dispacher dispacher);
pojo: Dispacher
@Lombok public class Dispacher private int dispacherId; private int orderId; private String orderContent; private String createTime; private int userId;
service: DispacherService
@Service //@Transactional(rollbackFor = Exception.class) public class DispatcherService @Autowired private DispatcherDao dispatcherDao; public boolean dispatcher(int orderId) Dispacher dispacher = new Dispacher(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String dateNow = sdf.format(new Date()); dispacher.setCreateTime(dateNow); dispacher.setOrderId(orderId); dispacher.setDispacherId(orderId); dispacher.setOrderContent(UUID.randomUUID().toString()); dispacher.setUserId(orderId); if(dispacher==null) return false; else int i = dispatcherDao.insertUser(dispacher); System.out.println("影响行数:"+i); return true;
web: DispacherController
@RestController @RequestMapping("/dispatcher") public class DispatcherController @Autowired DispatcherService dispatcherService; @Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务以上是关于RabbitMq入门案例的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)SpringAMQP发布订阅模式(FanoutExchangeDirectExchangeTopicExchange)消息转换器