SpringBoot - SpringBoot集成RabbitMQ

Posted MinggeQingchun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot - SpringBoot集成RabbitMQ相关的知识,希望对你有一定的参考价值。

一、SpringBoot集成RabbitMQ

创建两个模块,一个命名springboot-send,一个命名springboot-receive

在两个工程的 pom.xml配置文件中引入AMQP依赖

<dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--添加AMQP的起步依赖,添加成功后就会自动引入RabbitMQ的依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

核心配置文件application.properties文件中

#配置RabbitMQ相关连接信息(单机版)
spring.rabbitmq.host=192.168.133.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
#连接超时,单位毫秒,0表示无穷大,不超时
spring.rabbitmq.connection-timeout=0

#配置RabbitMQ相关连接信息(集群版)
#spring.rabbitmq.addresses=192.168.133.129:5672,192.168.133.130:5672
#spring.rabbitmq.username=root
#spring.rabbitmq.password=root

1、direct交换机

消息发送方

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig 

    //配置Direct类型一个队列
    @Bean
    public Queue directQueue()
        return new Queue("bootDirectQueue");
    

    //配置一个Direct类型的交换
    @Bean
    public DirectExchange directExchange()
        return new DirectExchange("bootDirectExchange");
    

    /**
     *  配置Direct类型一个队列和交换机的绑定
     * @param directQueue  需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @param directExchange  需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange)
        /*
        参数1 需要绑定的队列
        参数2 需要绑定的交换机
        参数3 绑定时的RoutingKey
        * */
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
    

@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * direct 交换机
     * @param message
     */
    public void sendDirectMessage(String message) 
        /**
         * 发送消息
         * 参数 1 为交换机名
         * 参数 2 为RoutingKey
         * 参数 3 为我们的具体发送的消息数据
         */
        amqpTemplate.convertAndSend("bootDirectExchange","bootDirectRoutingKey",message);
    
 amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
    

消息接收方

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig 

    //配置一个Direct类型的交换
    @Bean
    public DirectExchange directExchange()

        return new DirectExchange("bootDirectExchange");
    
    //配置一个队列
    @Bean
    public Queue directQueue()

        return new Queue("bootDirectQueue");
    

    /**
     * 配置一个队列和交换机的绑定
     * @param directQueue  需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @param directExchange  需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange)
        /*
        参数1 需要绑定的队列
        参数2 需要绑定的交换机
        参数3 绑定时的RoutingKey
        * */
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
    

2、fanout交换机

消息发送方

@Configuration
public class RabbitMQConfig 

    //配置一个 Fanout类型的交换
    @Bean
    public FanoutExchange fanoutExchange()
        return new FanoutExchange("fanoutExchange");
    

@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * fanout 交换机
     * @param message
     */
    public void sendFanoutMessage(String message) 
        amqpTemplate.convertAndSend("fanoutExchange","",message);
    

消息接收方

@Configuration
public class RabbitMQConfig 
    //创建一个名字为 fanoutQueue的队列
    @Bean
    public Queue fanoutQueue()
        return new Queue("fanoutQueue");
    
    
    //创建一个名字为 BootFanoutExchange的交换机
    @Bean
    public FanoutExchange fanoutExchange()
        return new FanoutExchange("BootFanoutExchange");
    

    @Bean
    public Binding  fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange)
       //将队列绑定到指定的交换机上
        //参数1 为指定的队列对象
        //参数2 为指定的交换机对象
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;


    @RabbitListener(bindings=
                            @QueueBinding(//@QueueBinding注解要完成队列和交换机的绑定
                                          value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
                                          exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
                                          )
                            
                   )
    public void fanoutReceive01(String message)
        System.out.println("fanoutReceive01监听器接收的消息----"+message);
    

    @RabbitListener(bindings=
            @QueueBinding(//@QueueBinding注解要完成队列和交换机的绑定
                    value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
                    exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
            )
    
    )
    public void fanoutReceive02(String message)
        System.out.println("fanoutReceive02监听器接收的消息----"+message);
    

3、topic交换机

消息发送方

@Configuration
public class RabbitMQConfig 
    //配置一个 Topic 类型的交换
    @Bean
    public TopicExchange topicExchange()
        return new TopicExchange("topicExchange");
    
@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * topic 交换机
     * @param message
     */
    public void sendTopicMessage(String message) 
        amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
    

消息接收方

@Configuration
public class RabbitMQConfig 
    @Bean
    public Queue topicQueue()

        return new Queue("bootTopicQueue");
    

    //创建队列
    @Bean
    public Queue topicQueue2()
        return new Queue("topicQueue2");
    

    @Bean
    public TopicExchange topicExchange()
        return new TopicExchange("bootTopicExchange");
    

    @Bean
    public Binding topicBinding(Queue topicQueue,TopicExchange topicExchange)
        /*
        参数1 需要绑定的队列
        参数2 需要绑定的交换机
        参数3 绑定时的RoutingKey
        * */
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("boot");
    

    @Bean
    public Binding  topicBinding2(Queue topicQueue2,TopicExchange topicExchange)
        //将队列绑定到指定交换机
        //参数1 为指定队列对象
        //参数2 为指定的交换机对象
        //参数3 为RoutingKey的匹配规则,#.test表示 可以接收以任意路径靠头的但是必须以test结尾的队列
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.text");
    
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;

    @RabbitListener(bindings = @QueueBinding(value=@Queue("topic01"),key = "aa",exchange =@Exchange(name = "topicExchange",type = "topic")))
    public void  topicReceive01(String message)
        System.out.println("topic01消费者 ---aa---"+message );
    

    @RabbitListener(bindings = @QueueBinding(value=@Queue("topic02"),key = "aa.*",exchange =@Exchange(name = "topicExchange",type = "topic")))
    public void  topicReceive02(String message)
        System.out.println("topic02消费者 ---aa.*---"+message );
    

    @RabbitListener(bindings = @QueueBinding(value=@Queue("topic03"),key = "aa.#",exchange =@Exchange(name = "topicExchange",type = "topic")))
    public void  topicReceive03(String message)
        System.out.println("topic03消费者 ---aa.#---"+message );
    

运行测试Send消息发送,编写Application.java类

@SpringBootApplication
public class RabbitmqSpringbootSendApplication 

    public static void main(String[] args) 
        ApplicationContext applicationContext = SpringApplication.run(RabbitmqSpringbootSendApplication.class, args);

        SendService sendService = (SendService) applicationContext.getBean("sendService");

//        sendService.sendDirectMessage("Boot的direct测试数据");
//        sendService.sendFanoutMessage("Boot的Fanout测试数据");
        sendService.sendTopicMessage("Boot的Topic测试数据,key为aa.bb.cc");
    

 运行测试Receive消息接收,编写Application.java类

@SpringBootApplication
public class RabbitmqSpringbootReceiveApplication 

    public static void main(String[] args) 

        ApplicationContext applicationContext =  SpringApplication.run(RabbitmqSpringbootReceiveApplication.class, args);
        ReceiveService service = (ReceiveService) applicationContext.getBean("receiveService");
        //使用了消息监听器接收消息那么就不需要调用接收方法来接收消息
//        service.receive();
    

以上是关于SpringBoot - SpringBoot集成RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot集成Kafka

SpringBoot.03.SpringBoot集成jsp

SpringBoot.03.SpringBoot集成jsp

SpringBoot.03.SpringBoot集成jsp

SpringBoot入门到精通-SpringBoot集成SSM开发项目

SpringBoot使用·下篇(SpringBoot集成MyBatis+日志打印+MyBatis-plus)