Rabbitmq的使用四_Java Client方式使用Topic模式

Posted 莹小草

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq的使用四_Java Client方式使用Topic模式相关的知识,希望对你有一定的参考价值。

Rabbitmq的使用四_Java Client方式使用Topic模式

1.官方文档地址:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

2.定义:topic和direct类似,也是将消息发送到RoutingKey和BindingKey相匹配的队列中,只不过可以模糊匹配。

RoutinKey为一个被“.”号分割的字符串

交换机和队列的绑定键也是一样的格式。

topic交换机发送给队列的消息,通过routingkey进行匹配。匹配上的队列都可以收到消息。

有两种重要的绑定键的特殊情况:

  * 号只能代替一个单词。
  # 号可以替换零个或多个单词。

 

 

                        图一                                                图二

如图二所示:

在这个例子中,我们将发送所有描述动物的信息。消息将与一个routingkey一起发送,routingkey由三个单词(两个点)组成。路由key中的第一个词将描述速度,第二个词描述颜色,第三个词描述物种:

<speed>.<colour>.<species>

Q1队列和交换机之间通过 *.orange.* 绑定键进行绑定。Topic交换机X和Q2的绑定键有两个,分别是 *.*.rabbit   和lazy.#

这些绑定可以总结为:

  Q1队列对所有橙色的动物都感兴趣。

  Q2队列想收到关于兔子的一切消息,以及关于懒惰动物的一切消息。

主题交换机功能强大,可以像其他交换机一样工作。

模拟以下routintkey来验证topic交换机的使用

quick.orange.rabbit,将被传递到两个队列。
lazy.orange.elephant 将被传递到两个队列
quick.orange.fox 只会去第一个队列
lazy.brown.fox 只去第二个队列
lazy.pink.rabbit 只会传递到第二个队列一次,即使它匹配两个绑定
quick.brown.fox 不匹配任何绑定,因此将被丢弃
orange 或者 quick.orange.male.rabbit   这些消息不会匹配任何绑定,将会丢失
lazy.orange.male.rabbit 即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。 

 

代码案例:

1.创建一个生产者

public class RabbitMQtopicSender {

    // 创建一个topic交换机
    private static final String topic_EXCHANGE_NAME = "topic_exchange_name";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取一个rabbitmq的连接
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.创建一个通道
        Channel channel = rabbitMQConnections.createChannel();
        // 3. 声明一个topic交换机
        channel.exchangeDeclare(topic_EXCHANGE_NAME, "topic");
        // 4.向交换机发送一条消息routingkey 为:quick.orange.rabbit
        channel.basicPublish(topic_EXCHANGE_NAME, "quick.orange.rabbit", null, "quick.orange.rabbit========1=======>根据绑定键,将被传递到两个队列".getBytes());
        // 向交换机发送一条消息routingkey 为:quick.orange.rabbit
        channel.basicPublish(topic_EXCHANGE_NAME, "lazy.orange.elephant", null, "lazy.orange.elephant========2=======>根据绑定键,将被传递到两个队列".getBytes());
        channel.basicPublish(topic_EXCHANGE_NAME, "quick.orange.fox", null, "quick.orange.fox=====3=======>根据绑定键,将被传递到Q1队列".getBytes());
        channel.basicPublish(topic_EXCHANGE_NAME, "lazy.brown.fox", null, "lazy.brown.fox=======4=======>根据绑定键,将被传递到Q2队列".getBytes());
        channel.basicPublish(topic_EXCHANGE_NAME, "lazy.pink.rabbit", null, "lazy.pink.rabbit=====5======>只会传递到第二个队列一次,即使它匹配两个绑定".getBytes());
        channel.basicPublish(topic_EXCHANGE_NAME, "quick.brown.fox", null, "quick.brown.fox===6=========>匹配不到任何队列,被丢弃".getBytes());
        channel.basicPublish(topic_EXCHANGE_NAME, "orange", null, "orange=======7========>匹配不到任何队列,被丢弃".getBytes());
        channel.basicPublish(topic_EXCHANGE_NAME, "quick.orange.male.rabbit", null, "quick.orange.male.rabbit=======8======>匹配不到任何队列,被丢弃".getBytes());
        channel.basicPublish(topic_EXCHANGE_NAME, "lazy.orange.male.rabbit", null, "lazy.orange.male.rabbit=======9=======>匹配队列2".getBytes());
        System.out.println("消息发送成功");
        // 5.发送完消息后关闭连接
        channel.close();
        rabbitMQConnections.close();
    }
}

创建消费者1

public class RabbitMQtopicReceiver {
    // 创建一个topic交换机
    private static final String topic_EXCHANGE_NAME = "topic_exchange_name";
    // 创建队列一
    private static final String topic_QUEUE_NUM_ONE = "topic_queue_num_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.获取通道
        Channel channel = rabbitMQConnections.createChannel();
        // 3.声明交换机
        channel.exchangeDeclare(topic_EXCHANGE_NAME, "topic");
        // 4.生命队列
        channel.queueDeclare(topic_QUEUE_NUM_ONE, false, false, false, null);
        // 交换机和队列匹配在一起
        channel.queueBind(topic_QUEUE_NUM_ONE, topic_EXCHANGE_NAME, "*.orange.*");
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                StringBuilder builder = new StringBuilder();
                builder.append(new Date()).append("---").append("消费者1接收到的消息==========>").append(message);
                System.out.println(builder.toString());
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 4.设置手动应答
        channel.basicConsume(topic_QUEUE_NUM_ONE, false, deliverCallback, consumerTag -> {
        });
    }
}

消费者2

public class RabbitMQtopicReceiver2 {
    // 创建一个topic交换机
    private static final String topic_EXCHANGE_NAME = "topic_exchange_name";
    // 创建队列一
    private static final String topic_QUEUE_NUM_TWO = "topic_queue_num_two";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.获取通道
        Channel channel = rabbitMQConnections.createChannel();
        // 3.声明交换机
        channel.exchangeDeclare(topic_EXCHANGE_NAME, "topic");
        // 声明队列
        channel.queueDeclare(topic_QUEUE_NUM_TWO, false, false, false, null);
        // 交换机和队列匹配在一起
        channel.queueBind(topic_QUEUE_NUM_TWO, topic_EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(topic_QUEUE_NUM_TWO, topic_EXCHANGE_NAME, "lazy.#");
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                StringBuilder builder = new StringBuilder();
                builder.append(new Date()).append("---").append("消费者2接收到的消息==========>").append(message);
                System.out.println(builder.toString());
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 4.设置手动应答
        channel.basicConsume(topic_QUEUE_NUM_TWO, false, deliverCallback, consumerTag -> {
        });
    }
}

执行结果如下:

图三是消费者1接收到的消息,图四是消费者2收到的消息

                                  图三

                                    图四

从运行结果可以看出。交换机中的消息,根据routingkey模糊匹配,进行分发到了不同的队列。

  

以上是关于Rabbitmq的使用四_Java Client方式使用Topic模式的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 qpid-jms-client 创建 RabbitMQ 持久队列?

Java使用RabbitMQ之公平分发

com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)

RabbitMQ简单Java示例——生产者和消费者

消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式消费者(接收方)自行确认模式生产者(发送方)确认模式)

rabbitmq at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.4.3.jar:5.4.3] 错误