Exchanges交换机

Posted 杀手不太冷!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Exchanges交换机相关的知识,希望对你有一定的参考价值。

Exchanges交换机

概念

生产者生产的消息从不会直接发送到队列中,生产者生产的消息会先发送到交换机中,具体这个消息会发送到哪个队列中,这是交换机应该做的事情。

交换机的工作内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把它们发送到许多队列中,再或者是把这些消息直接丢弃。不同类型的交换机,处理消息的方法也不同。

交换机类型

交换机一共有以下几种类型,第一种类型是直接(direct)类型,第二种类型是主题(topic)类型,第三种类型是扇出(fanout)类型,第四种类型是标题(headers)类型。

无名exchange交换机

在本教程的前面部分我们对exchange一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换机,我们通过空字符串("")进行标识,如下图:

上面的方法中的第一个参数是交换机的名称。空字符串标识默认或无名交换机:消息能路由发送到队列中其实是由routingKey(bindingKey)绑定key指定的,之前没有指定交换机的时候,这个key默认就是我们的队列的名字,但是如果我们指定了具体类型的交换机,那么我们绑定的key就不能使用默认值了,我们必须要指定一下。

绑定(bindings)

绑定其实就是把交换机和队列绑定在一起,可以把交换机和一个队列绑定在一起,也可以把交换机和多个队列绑定在一起,那么你生产者发消息的时候也比较自由了,你可以通过交换机只给一个队列发消息,你也可以通过交换机给一群队列发消息。

临时队列

临时队列就是临时生成的队列,当消费者和生产者宕机之后,临时队列也会随机消失,生成临时队列的代码如下图:

Fanout扇出类型交换机(也叫作发布订阅模式)

概念

Fanout类型的交换机会把生产者生产的消息广播到它知道的所有队列中。

有这样一个测试场景: 目前有一个发送者,和两个接收者,我们想要实现的功能是,当这一个发送者发送一个消息之后,这条消息会被发送到这两个接收者中。那么怎样实现呢?就是,我们需要在发送者里面把消息发送到Fanout扇出类型的交换机里面,然后对于两个接收者而言,每个接收者都需要自己创建一个临时的队列,然后把这个临时队列和交换机绑定在一起,这样交换机就能同时往两个临时队列里面发送消息,其实也就是当发送者发送一个消息之后,会首先发送到交换机里面,然后这个交换机会把这个消息同时发送到两个临时队列中。这样也就实现了”一个发送者把消息发送到两个接收者里面“的这个功能。

两个接收者代码

首先来看一下接收者的代码,如下图:

/**
 * @Date 2021/11/12 11:22
 * @Author 望轩
 *
 * 消息的接收
 */
public class ReceiveLogs01 {
    //交换机的名字
    public static final String EXCHANGE_NAME="logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.210.39");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //声明一个临时队列,队列的名称是随机的,当消费者断开与队列的连接的时候,队列就自动删除
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机与队列
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("等待接收消息,把收到的消息打印在屏幕上....");

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println("ReceiveLogs01在控制台打印接收到的消息:"+new String(message.getBody()));
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

    }
}

接收者2的代码和这个一样,它也是创建一个临时队列,然后把队列和交换机绑定在一起。

发送者代码

/**
 * @Date 2021/11/12 11:36
 * @Author 望轩
 *
 * 发消息给交换机
 */
public class EmitLog {
    //交换机的名字
    public static final String EXCHANGE_NAME="logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.210.39");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //从控制台当中接收信息
        Scanner scanner=new Scanner(System.in);
        while(scanner.hasNext()){
            String message=scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("生产者发送的消息是:"+message);
        }
    }
}

测试

两个接收者都成功接收到了信息“AA”,如下图:

Direct直接交换机类型

概念

直接交换机类型其实是在Fanout扇出交换机类型的基础上做的一个延伸,什么意思呢?就是在Fanout扇出交换机类型中,发送者会把消息发送给所有和交换机绑定的队列中,有几个队列就发送几个队列。但是在使用Direct直接交换机类型时,即便你的交换机和所有的队列都绑定了,但是交换机也不会把你的信息发送到所有的队列中,为什么呢?因为在交换机和队列绑定的时候交换机会给队列加上一个标识,然后生产者向交换机中发送消息的时候,会告诉交换机一个标识,这样交换机就会通过这个标识找到对应的队列,然后只把消息发送到有这个标识的队列。

Direct直接交换机类型,其实也叫作路由器交换机类型,就是交换机会把消息发送给指定的队列,而不会发送给所有的队列。

这就好比是一个人在群里发了一条消息,虽然所有人都能看见,但是这条消息真正发送的人是这个发送者@的那几个人。

下面用一个例子来演示Direct直接交换机类型,就是,我们会写两个接收者,一个发送者。这两个接收者中都会生成一个消息队列,然后这个消息队列都会和Direct类型的交换机进行绑定,但是绑定的标识不一样,然后我们用一个发送者往交换机中发送消息,发送消息的时候指定一个标识,最后我们的交换机不会把这条消息发送到所有与它绑定的队列中,而是把这条消息发送到在队列和交换机绑定的时候存在对应标识的队列中。

两个接收者代码

/**
 * @Date 2021/11/12 13:50
 * @Author 望轩
 */
public class ReceiveLogsDirect01 {
    public static final String EXCHANGE_NAME="direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.210.39");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //声明一个队列
        channel.queueDeclare("console", false, false, false, null);

        channel.queueBind("console",EXCHANGE_NAME,"info");

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println("ReceiveLogsDirect01接收者接收到的消息:"+new String(message.getBody()));
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume("console",true,deliverCallback,cancelCallback);
    }
}

另外一个接收者代码如下:

/**
 * @Date 2021/11/12 13:57
 * @Author 望轩
 */
public class ReceiveLogsDirect02 {
    public static final String EXCHANGE_NAME="direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.210.39");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //声明一个队列
        channel.queueDeclare("disk", false, false, false, null);

        channel.queueBind("disk",EXCHANGE_NAME,"error");

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println("ReceiveLogsDirect02接收者接收到的消息:"+new String(message.getBody()));
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume("disk",true,deliverCallback,cancelCallback);
    }
}

发送者代码

/**
 * @Date 2021/11/12 13:59
 * @Author 望轩
 */
public class DirectLogs {
    //交换机的名字
    public static final String EXCHANGE_NAME="direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.210.39");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //从控制台当中接收信息
        Scanner scanner=new Scanner(System.in);
        while(scanner.hasNext()){
            String message=scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes());
            System.out.println("生产者发送的消息是:"+message);
        }
    }
}

测试

发送者向交换机发送消息,如下图:

两个接收者接收到消息的情况:

以上是关于Exchanges交换机的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ交换机(Exchanges)

RabbitMQ-Exchanges交换机

(九)RabbitMQ交换机(Exchange)

RabbitMQ - 交换机

RabbitMQ基础总结

RabbitMQ基础总结