SpringBoot整合RabbitMQ

Posted Ronaldo7

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ相关的知识,希望对你有一定的参考价值。

1 整合RabbitMQ

1.1 RabbitMQ的相关概念

  • 组成部分
    • 队列(Queue)

    声明队列
    ```java
    @Bean
    public Queue addUserQueue() {
    return new Queue("demo-user-add");
    }

    ```
    • 交换机(Exchange)

    用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
    这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
    交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
    Direct

    direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去。Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

    • topic

      类似Director模式,但是更加灵活,可以根据通配符去寻找对应的exchange。
      • 匹配一个字符
        # 匹配多个字符
        Headers
        设置header attribute参数类型的交换机
        Fanout
        转发消息到所有绑定队列;
        消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。
    代码声明一个exchange:
    java @Bean public TopicExchange demoTestTopicExchange() { return new TopicExchange("demoTestTopic"); }
    • 绑定(Binding)
      通过routing key声明exchange与queue之间关系的。从而确定了我这个msg发到哪个exchange上面
      。然后与exchange再路由到对应的queue上面,从而发给了对应的消费者
        @Bean
        public Binding addUserBinding() {
            return BindingBuilder.bind(addUserQueue()).to(addUserTopicExchange()).with("cn.com.user.add");
    
        }

    1.2 发送、接收消息

    1.2.1 发送消息

    ```java
    @Slf4j
    @Component
    public class UserMQSender {
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
    • 发送消息
    • @param exchangeNanme 队列名称
    • @param routingKey 路由key
    • @param msg 具体消息内容
    • @throws Exception
      */
      public void sendUserMQ(String exchangeNanme, String routingKey, String msg) throws Exception {
      log.info("向交换机:{},匹配规则:{}, 发送消息:{}", exchangeNanme, routingKey, msg);
      this.amqpTemplate.convertAndSend(exchangeNanme, routingKey, msg);
      }

}
##### 1.2.2 接收消息 * 通过注解的方式主动监听接收 > 声明我要监听哪个queue即可java
@RabbitListener(queues = "demo-user-add")
public void getMsg(String msg) throws Exception{
log.info("获取消息{}", msg);
User user = (User) JSONObject.toBean(JSONObject.fromObject(msg), User.class);
userService.addUser(user);
}
* 被动接收java
String data = (String) this.amqpTemplate.receiveAndConvert(queueName);
#### 1.3 模拟高并发取值java
@Test
public void testGetMsg() throws Exception{
ExecutorService service = Executors.newCachedThreadPool(); //创建一个线程池
final CountDownLatch beginCountDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(100);

for (int i = 0; i < 100; i++) {
    Runnable runnable = new Runnable() {
        int index = 1;
        @Override
        public void run() {
            try {
                /**
                 *如果调用对象上的await()方法,那么调用者就会一直阻塞在这里,直到别人通过cutDown方法,将计数减到0,才可以继续执行。
                 * 这里先调用beginCountDownLatch的await方法,等到循环结束后,内存中就有100个线程等待去运行。
                 * 所以等到beginCountDownLatch调用countDown的时候,100个线程就开始执行
                 */
                beginCountDownLatch.await();
                log.info("------->index:{}", index);
                String data = (String) amqpTemplate.receiveAndConvert("demo-test");
                log.info("==============>消息n内容:{}", data);

                countDownLatch.countDown();
                index++;

            } catch (Exception e) {
                e.printStackTrace();
            }


        }
    };
    service.execute(runnable);
}
//释放主线程,之前声明的100个线程就开始执行
beginCountDownLatch.countDown();
//
countDownLatch.await();

}
```












































以上是关于SpringBoot整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

springboot rabbitmq整合

Springboot整合Rabbitmq(史上最详细)

SpringBoot整合RabbitMQ实现死信队列

RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例

SpringBoot整合RabbitMQ,实现消息发送和消费

SpringBoot系列5SpringBoot整合RabbitMQ