RabbitMQ的核心组成部分超详细

Posted MyAzhe0ci3

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ的核心组成部分超详细相关的知识,希望对你有一定的参考价值。

RabbitMQ的核心组成部分


核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server

Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手

Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。

Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange

Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)

Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.

Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。

Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

RabbitMQ的运行流程

RabbitMQ支持消息的模式




参考rabbitmq官网:https://www.rabbitmq.com/getstarted.html

1. 简单模式功能:

功能:一个生产者P发送消息到队列Q,一个消费者C接收
生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。

生产者

public class Producer 
    public static void main(String[] args) 
        //所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // 1:创建连接工程
        ConnectionFactory   connectionFactory=new ConnectionFactory();
        //设置ip
        connectionFactory.setHost("139.196.122.115");
        //设置默认端口
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //消息发送地址 根目录下
        connectionFactory.setVirtualHost("/");
        Connection connection=null;
        Channel channel=null;
        try 
            // 2:创建连接获取通道Connection  Rabbitmq为什么是基于channel去处理而不是connection ?长连接 ----信道channel 在高并发的情况,会创建多个通道(高性能)
            connection=connectionFactory.newConnection("生产者");
            // 3:通过连接获取通道Channel
            channel=connection.createChannel();
            // 4:通过通道创建交换机,生命队列,绑定关系,路由key,发送消息,和接收消息
            //设置队列名称
            String queueName="queue1";

             /*
              * @params1 队列的名称
              * @params2 是否持久化durable=false 所谓持久化消息是否存盘,如果false非持久化,true是持久化
              * @params3 是否排他性,是否是独占队列
              * @params4 是否真的删除,随着最后一个消费者消息完毕以后是否把队列自动删除
              * @params5 携带附属参数
              */
            channel.queueDeclare(queueName,false,false,false,null);
            // 5:准备消息内容
            String message="Hello World ";
            // 6:发送消息给队列queue
            /*
             * @param1:交换机 @param2:队列,路由key @params3 消息的状态控制,@params4消息主题
             * 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机
             */
            channel.basicPublish("",queueName,null,message.getBytes());

         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        finally 
            // 7:关闭连接
            if(channel!=null&& channel.isOpen())
                try 
                    channel.close();
                 catch (IOException e) 
                    e.printStackTrace();
                 catch (TimeoutException e) 
                    e.printStackTrace();
                
            
            // 8:关闭通道
            if(connection!=null&& connection.isOpen())
                try 
                    connection.close();
                 catch (Exception e) 
                    e.printStackTrace();
                
            
        


    

消费者实现思路

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue, 创建消费者并监听队列,从队列中读取消息。

消费者

package com.newer.rabbitmq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer 
    public static void main(String[] args) 
        //所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // 1:创建连接工程
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //设置ip
        connectionFactory.setHost("139.196.122.115");
        //设置默认端口
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //消息发送地址 根目录下
        connectionFactory.setVirtualHost("/");
        Connection connection=null;
        Channel channel=null;
        try 
            // 2:创建连接获取通道Connection
            connection=connectionFactory.newConnection("生产者");
            // 3:通过连接获取通道Channel
            channel=connection.createChannel();
            // 4:通过通道创建交换机,生命队列,绑定关系,路由key,发送消息,和接收消息
            channel.basicConsume("queue1", true, new DeliverCallback() 
                public void handle(String s, Delivery message) throws IOException 
                    System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
                
            , new CancelCallback() 
                public void handle(String s) throws IOException 
                    System.out.println("接收失败了...");
                
            );
            System.out.println("开始接收消息");
            //对消息进行阻断不在往下运行
            System.in.read();
         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        finally 
            // 7:关闭连接
            if(channel!=null&& channel.isOpen())
                try 
                    channel.close();
                 catch (IOException e) 
                    e.printStackTrace();
                 catch (TimeoutException e) 
                    e.printStackTrace();
                
            
            // 8:关闭通道
            // 7:关闭连接
            if(connection!=null&& connection.isOpen())
                try 
                    connection.close();
                 catch (Exception e) 
                    e.printStackTrace();
                
            
        


    


2. 工作队列模式Work Queue

功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

任务队列:避免立即做一个资源密集型任务,必须等待它完成,而是把这个任务安排到稍后再做。我们将任务封装为消息并将其发送给队列。后台运行的工作进程将弹出任务并最终执行作业。当有多个worker同时运行时,任务将在它们之间共享。

生产者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,2条消息之间间隔一定时间,关闭通道和连接。

消费者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,创建消费者C1并监听队列,获取消息并暂停10ms,另外一个消费者C2暂停1000ms,由于消费者C1消费速度快,所以C1可以执行更多的任务。

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

轮询模式

生产者

package com.newer.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author: 学相伴-飞哥
 * @description: Producer 简单队列生产者
 * @Date : 2021/3/2
 */
public class Producer 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("139.196.122.115");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            //===============================end topic模式==================================
            for (int i = 1; i <= 20; i++) 
                //消息的内容
                String msg = "学相伴:" + i;
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish("", "queue5", null, msg.getBytes());
            
            System.out.println("消息发送成功!");
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    

消费者1

package com.newer.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author:楚风
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Work1 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("139.196.122.115");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一时刻,服务器只会推送一条消息给消费者
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
          //  finalChannel.basicQos(1);
            finalChannel.basicConsume("queue5", false, new DeliverCallback() 
                @Override
                public void handle(String s, Delivery delivery) throws IOException 
                    try
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    catch(Exception ex)
                        ex.printStackTrace();
                    
                
            , new CancelCallback() 
                @Override
                public void handle(String s) throws IOException 
                
            );
            System.out.println("Work1-开始接受消息");
            System.in.read();
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null && connection.isOpen()) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    

消费者2

package com.newer.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author: 楚风
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Work2 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("139.196.122.115");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work2");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的

以上是关于RabbitMQ的核心组成部分超详细的主要内容,如果未能解决你的问题,请参考以下文章

超详细的RabbitMQ发布订阅模式讲解

RabbitMQ超详细安装教程(Linux)

RabbitMQ超详细安装教程(Linux)

RabbitMq消息可靠性之确认模式 通俗易懂 超详细 内含案例

RabbitMQ 超详细入门篇

RabbitMQ超详细学习笔记(章节清晰+通俗易懂)