(九)RabbitMQ交换机(Exchange)

Posted 小怪吖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(九)RabbitMQ交换机(Exchange)相关的知识,希望对你有一定的参考价值。

交换机Exchange

1、交换机

1.1. Exchanges 概念

RabbitMQ 消息传递模型的核心思想:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。

1.2. Exchanges 的类型

  • 直接(direct),
  • 主题(topic)
  • 标题(headers)
  • 扇出(fanout)

1.3. 无名exchange(默认交换机)

也就是推送消息时,交换机的参数是 “ ”,这里用的就是默认的交换机

channel.basicPublish("", queueName, null, msg.getBytes());

2、临时队列

也就是队列名称随机产生,一旦我们断开了消费者的连接,队列将被自动删除,也没有持久化

//创建随机队列
String queueName = channel.queueDeclare().getQueue();

3、绑定(bindings)

概念 :binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队列进行了绑定关系。

4、Fanout(发布/订阅)

将接收到的所有消息广播到它绑定的所有队列中,也就是关于routingKey他不管。系统中油默认 exchange 类型

不管是生产者还是消费者声明的交换机,只要谁先启动就只有一个

生产者

package com.feng.fanoutExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:26
 * @Version 1.0
 * @Description fanout模式生产者
 */
public class EmitLog 
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";`在这里插入代码片`

    public static void main(String[] args) throws IOException, TimeoutException 

        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext())
            String msg = scanner.next();
            //设置消息持久化
            channel.basicPublish(EXCHANGE_NAME,"", null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        

    


消费者1

package com.feng.fanoutExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:08
 * @Version 1.0
 * @Description 消息消费者1
 */
public class ReceiveLogs01 
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        //channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //声明一个临时队列
        /**
         * 1、队列名是随机取得
         * 2、连接断开之后队列就删除了
         */
        String queueName = channel.queueDeclare().getQueue();//返回队列名
        //交换机绑定队列,这里因为是 Fanout模式,所以路由key其实并不重要,可以随便取
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        channel.basicConsume(queueName, true, (String consumerTag, Delivery message) -> 
            System.out.println("01接收消息是:" + new String(message.getBody(), "UTF-8"));
        , consumerTag -> 
        );
    


消费者2

package com.feng.fanoutExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:08
 * @Version 1.0
 * @Description 消息消费者1
 */
public class ReceiveLogs02 
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        //channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //声明一个临时队列
        /**
         * 1、队列名是随机取得
         * 2、连接断开之后队列就删除了
         */
        String queueName = channel.queueDeclare().getQueue();//返回队列名
        //交换机绑定队列,这里因为是 Fanout模式,所以路由key其实并不重要,可以随便取
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        channel.basicConsume(queueName, true, (String consumerTag, Delivery message) -> 
            System.out.println("02接收消息是:" + new String(message.getBody(), "UTF-8"));
        , consumerTag -> 
        );
    


5、Direct exchange、

队列只对它绑定的交换机的消息感兴趣。也就是由routingKey决定交换机发送到哪个队列

代码实现

示意图

console队列

package com.feng.directExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:45
 * @Version 1.0
 * @Description direct 模式交换机消费者1
 */
public class ReceiveLogsDirect01 
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("console",false,false,false,null);
        //交换机绑定队列,这里因为是 direct,所以路由key得绑定
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");
        //接收消息
        channel.basicConsume("console", true, (String consumerTag, Delivery message) -> 
            System.out.println("console队列接收消息是:" + new String(message.getBody(), "UTF-8"));
        , consumerTag -> 
        );
    


disk队列

package com.feng.directExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:45
 * @Version 1.0
 * @Description direct 模式交换机消费者2
 */
public class ReceiveLogsDirect02 
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机,因为是fanout所以不用声明交换机也行
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("disk",false,false,false,null);
        //交换机绑定队列,这里因为是 direct,所以路由key得绑定
        channel.queueBind("disk", EXCHANGE_NAME, "error");
        //接收消息
        channel.basicConsume("disk", true, (String consumerTag, Delivery message) -> 
            System.out.println("disk队列接收消息是:" + new String(message.getBody(), "UTF-8"));
        , consumerTag -> 
        );
    


发送方

package com.feng.directExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 20:58
 * @Version 1.0
 * @Description   direct 模式交换机生产者
 */
public class DirectLogs 
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException 

        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT );

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext())
            String msg = scanner.next();
            //这里想给谁发,就指定对应得RountingKey就行
            channel.basicPublish(EXCHANGE_NAME,"error", null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        

    


6、Topics

topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。比如说:“stock.usd.nyse”, “nyse.vmw”,有两个替换符,
*(星号) 可以代替一个单词
#(井号) 可以替代零个或多个单词

缺点:就是速度慢,需要匹配

特殊情况

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

代码实现

绑定情况

消费者1

package com.feng.topicExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 14:03
 * @Version 1.0
 * @Description Topic交换机消费者01
 */
public class ReceiveLogsTopic01 

    //交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";
    //队列名
    private static final String QUEUE_NAME = "Q1";

    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
        //消费消息
        channel.basicConsume(QUEUE_NAME,true,(String consumerTag, Delivery message) -> 
            System.out.println(QUEUE_NAME+"接收消息是:" + new String(message.getBody(), "UTF-8"));
            System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
        , consumerTag -> );
    


消费者2

package com.feng.topicExchange;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 14:03
 * @Version 1.0
 * @Description Topic交换机消费者02
 */
public class ReceiveLogsTopic02 

    //交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";
    //队列名
    private static final String QUEUE_NAME = "Q2";

    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
        //消费消息
        channel.basicConsume(QUEUE_NAME,<

以上是关于(九)RabbitMQ交换机(Exchange)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ初学之一:exchange与queue的绑定

RabbitMQ headers Exchange

消息中间件——RabbitMQ理解Exchange交换机核心概念!

3.RabbitMQ 第一个程序

理解 RabbitMQ Exchange

Rabbitmq交换机(exchange)类型解释