RabbitMQ学习 :主题交换机

Posted Stark_Tan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习 :主题交换机相关的知识,希望对你有一定的参考价值。

 

尽管直连交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。

为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 —— 主题交换机。

发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过255字节。

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:

* (星号) 用来表示一个单词.

# (井号) 用来表示任意数量(零个或多个)单词。

生产者代码:

public class Productor {
    public static void main(String[] args) throws IOException, TimeoutException {
        //配置rabbitmq服务器地址
       
ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("starktan");
        factory.setPassword("starktan");
        factory.setVirtualHost("/");
        //建立连接和通道
       
Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明一个主题交换机
       
channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        System.out.println("发送信息!");
        String message = "1.2.3";
        //发送routkey“1.2.3”
       
channel.basicPublish("topic", message, true, null, message.getBytes());
        channel.close();
        connection.close();
    }
}

 

消费者代码

package com.stark.example5;

public class Consumer {
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        //创建连接和通道
       
ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 4; i++) {
            final int cur = i;
            service.submit(new Runnable() {
                Channel channel = connection.createChannel();
                String queryname = channel.queueDeclare().getQueue();

                public void run() {
                    //创建队列消费者
                   
QueueingConsumer consumer = new QueueingConsumer(channel);
                    try {
                        switch (cur) {
                            case 0: //获取0开头的主题消息
                               
channel.queueBind(queryname, "topic", "1.#");
                            case 1: //获取3结尾的主题消息
                               
channel.queueBind(queryname, "topic", "#.3");
                            case 2: //获取2中间的主题消息
                               
channel.queueBind(queryname, "topic", "*.2.*");
                            default://获取4中间的主题消息
                               
channel.queueBind(queryname, "topic", "*.4.*");
                        }
                        channel.basicConsume(queryname, consumer);
                        while (true) {
                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String message = new String(delivery.getBody());
                            System.out.println("线程 " + cur + " 获取到消息 " + message);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        service.shutdown();
    }
}

以上是关于RabbitMQ学习 :主题交换机的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习 (主题)

rabbitmq学习之路

RabbitMQ 中的主题交换与直接交换

RabbitMQ入门教程:主题交换机Topics

与 RabbitMQ 的主题交换歧义

RabbitMQ:具有主题交换的持久消息