RabbitMQ的核心组成部分超详细
Posted MyAzhe0ci3
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ的核心组成部分超详细相关的知识,希望对你有一定的参考价值。
RabbitMQ的核心组成部分
核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
RabbitMQ的运行流程
RabbitMQ支持消息的模式
参考rabbitmq官网:https://www.rabbitmq.com/getstarted.html
1. 简单模式功能:
功能:一个生产者P发送消息到队列Q,一个消费者C接收
生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。
生产者
public class Producer {
public static void main(String[] args) {
//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// 1:创建连接工程
ConnectionFactory connectionFactory=new ConnectionFactory();
//设置ip
connectionFactory.setHost("139.196.122.115");
//设置默认端口
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//消息发送地址 根目录下
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
// 2:创建连接获取通道Connection Rabbitmq为什么是基于channel去处理而不是connection ?长连接 ----信道channel 在高并发的情况,会创建多个通道(高性能)
connection=connectionFactory.newConnection("生产者");
// 3:通过连接获取通道Channel
channel=connection.createChannel();
// 4:通过通道创建交换机,生命队列,绑定关系,路由key,发送消息,和接收消息
//设置队列名称
String queueName="queue1";
/*
* @params1 队列的名称
* @params2 是否持久化durable=false 所谓持久化消息是否存盘,如果false非持久化,true是持久化
* @params3 是否排他性,是否是独占队列
* @params4 是否真的删除,随着最后一个消费者消息完毕以后是否把队列自动删除
* @params5 携带附属参数
*/
channel.queueDeclare(queueName,false,false,false,null);
// 5:准备消息内容
String message="Hello World ";
// 6:发送消息给队列queue
/*
* @param1:交换机 @param2:队列,路由key @params3 消息的状态控制,@params4消息主题
* 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish("",queueName,null,message.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
// 7:关闭连接
if(channel!=null&& channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8:关闭通道
if(connection!=null&& connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
消费者实现思路
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue, 创建消费者并监听队列,从队列中读取消息。
消费者
package com.newer.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) {
//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// 1:创建连接工程
ConnectionFactory connectionFactory=new ConnectionFactory();
//设置ip
connectionFactory.setHost("139.196.122.115");
//设置默认端口
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//消息发送地址 根目录下
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
// 2:创建连接获取通道Connection
connection=connectionFactory.newConnection("生产者");
// 3:通过连接获取通道Channel
channel=connection.createChannel();
// 4:通过通道创建交换机,生命队列,绑定关系,路由key,发送消息,和接收消息
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String s, Delivery message) throws IOException {
System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收失败了...");
}
});
System.out.println("开始接收消息");
//对消息进行阻断不在往下运行
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
// 7:关闭连接
if(channel!=null&& channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8:关闭通道
// 7:关闭连接
if(connection!=null&& connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2. 工作队列模式Work Queue
功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列
任务队列:避免立即做一个资源密集型任务,必须等待它完成,而是把这个任务安排到稍后再做。我们将任务封装为消息并将其发送给队列。后台运行的工作进程将弹出任务并最终执行作业。当有多个worker同时运行时,任务将在它们之间共享。
生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,2条消息之间间隔一定时间,关闭通道和连接。
消费者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,创建消费者C1并监听队列,获取消息并暂停10ms,另外一个消费者C2暂停1000ms,由于消费者C1消费速度快,所以C1可以执行更多的任务。
3. 发布/订阅模式Publish/Subscribe
功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者
生产者:可以将消息发送到队列或者是交换机。
消费者:只能从队列中获取消息。
如果消息发送到没有队列绑定的交换机上,那么消息将丢失。
交换机不能存储消息,消息存储在队列中 我们可以通过两种方式实现图形化界面实现和代码实现
通过图形化界面实现
1.添加一个新的交换机,类型选择fanout
2.添加新的队列,因为需要多个队列进行操作
3.将队列跟交换机进行绑定
绑定成功,多绑定几个队列进行操作
4.发送消息
小结:只要消费者订阅了交换机,发送消息时所有订阅的消费者都能收到,这就是fanout模式
应用场景:微信,短信,发送邮件
生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为fanout,使用通道向交换机发送消息,关闭通道和连接。
消费者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,多个消费者进行监听。
4. 路由模式Routing
说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key
图形化界面实现
路由模式说白了就是比消息订阅模式多了一个路由key,学过sql的同学,可以理解为where条件,通过路由key去将消息发送到指定的队列,比如我们需要给微信,短信,邮箱发送消息,但是经理说不用给邮箱发送消息了,那么我们就可以指定路由key,那么最后接收到消息的就是微信和短信
1.添加交换机
2.绑定队列并添加路由key,绑定后我们发送消息就可以不填写队列名直接通过路由key去发送了
只有添加了sms路由key的能收到消息
生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为direct,使用通道向交换机发送消息并指定key=b,关闭通道和连接。
消费者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,但只要绑定key=b的队列key接收到消息,多个消费者进行监听。
5.主题模式Topics
功能:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为topic,使用通道向交换机发送消息并指定key=key.1,关闭通道和连接。
消费者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,凡是绑定规则符合通配符规则的队列均可以接收到消息,比如key.*,key.#,多个消费者进行监听。
小结:
rabbitmq发送消息一定有一个交换机
以上是关于RabbitMQ的核心组成部分超详细的主要内容,如果未能解决你的问题,请参考以下文章