springboot 2.X 集成RabbitMQ 详解topic 模式

Posted monco-sxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot 2.X 集成RabbitMQ 详解topic 模式相关的知识,希望对你有一定的参考价值。

Topic 模式
重点是理解交换器(exchange)、路由键(routing key)、队列名(queue name)三者之间的绑定关系。

  • topic 发送方:
    发送方 关注参数主要有三个 交换器(exchange) 路由键(routing key) 和 消息

  • topic 消费方
    消费方 关注点是队列的名字定义

  • topic 配置关系
    配置关系,主要关注于同一路由键下的 路由键和队列的 绑定关系

  • 重点使用的: 包含匹配规则 routing key 采用 . 分割 ,其中 “” 和 “#” 分别表达的意思是 其中“”用于匹配一个单词,“#”用于匹配多个单词
    例如:
    交换器名称(exchange name)为 A 队列名(queue name) 分别为 monco.xuzhou.man , monco.nanjing.man , sxy.xuzhou.woman , sxy.nanjing.woman
    现在我们需要给包含monco的所有队列,那么我们的路由键设置为 monco.#
    现在我们需要给包含sxy的所有xuzhou的队列,那么我们的路由键设置为 sxy.xuzhou.* 或者 sxy.xuzhou.##
    "#" 表示 可以配置之后的所有的,"*" 表示可以配置之后的一个

生产者代码如下:

package com.monco.sender;

import com.monco.config.RmConst;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author monco
 * @date 2020/4/15
 * @description: topic 发送者
 */
@Component
public class TopicSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {

        String msg1 = "I am email mesaage msg======";
        System.out.println("TopicSender send the 1st : " + msg1);
        this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.QUEUE_TOPIC_EMAIL, msg1);

        String msg2 = "I am user mesaages msg########";
        System.out.println("TopicSender send the 2nd : " + msg2);
        this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.QUEUE_TOPIC_USER, msg2);

        String msg3 = "I am error mesaages msg";
        System.out.println("TopicSender send the 3rd : " + msg3);
        this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, "errorkey", msg3);
    }
}

配置类代码如下:

    /**
     * 定义topic模式的交换器
     * exchange 交换器 名称  monco.topic.exchange
     * 队列名称  monco.topic.email  monco.topic.email
     * <p>
     * routing key 定义 monco.*.user
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(RmConst.EXCHANGE_TOPIC);
    }

    @Bean
    public Queue queueEmailMessage() {
        return new Queue(RmConst.QUEUE_TOPIC_EMAIL);
    }

    @Bean
    public Queue queueUserMessage() {
        return new Queue(RmConst.QUEUE_TOPIC_USER);
    }

    @Bean
    public Queue queueAllMessage() {
        return new Queue(RmConst.QUEUE_TOPIC_All);
    }

    @Bean
    public Binding bindingEmailExchangeMessage() {
        return BindingBuilder
                .bind(queueEmailMessage())
                .to(topicExchange())
                .with("monco.topic.email");
    }

    @Bean
    public Binding bindingUserExchangeMessages() {
        return BindingBuilder
                .bind(queueUserMessage())
                .to(topicExchange())
                .with("monco.*.user");
    }

    @Bean
    public Binding bindingAllExchangeMessages() {
        return BindingBuilder
                .bind(queueAllMessage())
                .to(topicExchange())
                .with("monco.*.user");
    }

消费者代码如下:

package com.monco.receiver;

import com.monco.config.RmConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author monco
 * @date 2020/4/15
 * @description: topic email 接收者
 */
@Slf4j
@Component
public class TopicEmailReceiver {

    @RabbitHandler
    @RabbitListener(queues = "monco.topic.email")
    public void processEmail(Message message, Channel channel) throws IOException {
        // 手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.info("processEmail receive" + new String(message.getBody()));
    }


    @RabbitHandler
    @RabbitListener(queues = "monco.topic.user")
    public void processUser(Message message, Channel channel) throws IOException {
        // 手动应答
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), true,false);
        log.info("processUser receive" + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = "monco.topic.user")
    public void processUsera(Message message, Channel channel) throws IOException {
        // 手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.info("processUser2 receive" + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = "monco.monco.user")
    public void processAll(Message message, Channel channel) throws IOException {
        // 手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.info("processAll receive" + new String(message.getBody()));
    }
}

以上是关于springboot 2.X 集成RabbitMQ 详解topic 模式的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ——SpringBoot集成RabbitMQ

RabbitMQ——SpringBoot集成RabbitMQ

SpringBoot - SpringBoot集成RabbitMQ

SpringBoot | 第十二章:RabbitMQ的集成和使用

rabbitmq入门springboot集成rabbitmq

rabbitmq入门springboot集成rabbitmq