SpringAMQP处理RabbitMQ的常用五种消息模型

Posted ITLepeng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringAMQP处理RabbitMQ的常用五种消息模型相关的知识,希望对你有一定的参考价值。

1.AMQP

AMQP(Advanced Message Queuing Protocol)AMQP,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

2.配置环境

引入依赖:

<dependencies>
    <!--lombok依赖-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--spring amqp-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

在application.yml中添加RabbitMQ配置

spring:
  rabbitmq:
    host: 192.168.136.160
    port: 5672
    username: lepeng
    password: lepeng
    virtual-host: lepeng

3.SimpleQueue


简单模型,一个生产者,一个队列,一个消费者
消息发送:

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpPublusherTest {

    /**
     * 发消息:
     *  1、注入RabbitTemplate对象
     *  2、调用convertAndSend方法发送消息
     */

    @Autowired
    private RabbitTemplate template;

    //基础消息队列
    @Test
    public void testSendBaseMessage() {
        String queueName = "simple.queue"; //队列名称
        String message = "宝 你今天输液了嘛";
        template.convertAndSend(queueName,message); //队列名,消息
    }
}

发送完成后,可以去RabbitMQ中simple.queue里查看发送的消息<注意先不要启动消费者,消费者如果接收则查看不到了>

消息接收:
消费者需要保持一个开启状态接收信息,所以需要配置监听消息队列的监听器,注意监听器需要与启动类放在同一目录

@Component
public class MessageListener {

    /**
     * 业务方法:
     *   没有返回值,有参数(生产者发送的消息)
     *   参数类型和发送的消息参数类型一致
     */
    @RabbitListener(queues = "simple.queue")
    public void recBaseMessage1(String message) {
        System.out.println("获取基础消息:"+message);
        //Thread.sleep(5);
    }
}

启动消费者类即可!!!

4.WorkQueue


循环模拟大量信息:

    @Test
    public void testSendWorkMessage() {
        for (int i = 0; i < 10; i++) {
            String queueName = "simple.queue"; //队列名称
            String message = "宝 你今天输液了嘛";
            template.convertAndSend(queueName,message); //队列名,消息
        }
    }

配置两个消费者,也就是监听器

    @RabbitListener(queues = "simple.queue")
    public void recBaseMessage1(String message) {
        System.out.println("消费者1获取基础消息:"+message);
    }
    @RabbitListener(queues = "simple.queue")
    public void recBaseMessage2(String message) {
        System.out.println("消费者2获取基础消息:"+message);
    }

结果:

但这样消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。
我们可以配置一个能者多劳机制。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

5.发布/订阅


可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • C:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

6.Fanout(广播)


在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

广播模式主要实现了消息能发送给多个用户,因为队列中消息一次只能发送一个人,所以需要使用交换机去选择队列,发送给不同用户。

发送消息

    //交换机
    @Test
    public void testFanoutMessage() {
        String exchange = "lepengExchange";
        String message = "输的什么液、想你的夜";
        //交换机名称,null,消息
        template.convertAndSend(exchange,"",message);
    }

接收消息

    @RabbitListener(
           bindings = @QueueBinding(
                   value = @Queue(           //绑定队列
                           name = "fanout.queue1"
                   ),
                   exchange = @Exchange(
                           name = "lepengExchange",    //绑定交换机
                           type = ExchangeTypes.FANOUT
                   )
           )

    )
    public void recFanoutMessage(String message) throws InterruptedException {
        System.out.println("消费1-获取基础消息:"+message);
        Thread.sleep(200);
    }

    @RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(  //绑定队列
                            name = "fanout.quque2"
                    ),
                    exchange = @Exchange(  //绑定交换机
                            name = "lepengExchange",
                            type = ExchangeTypes.FANOUT  //交换机类型
                    )
            )
    )
    public void recFanoutMessage2(String message) throws InterruptedException {
        System.out.println("消费2-获取基础消息:"+message);
        Thread.sleep(200);
    }

7.Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

发送消息

    //发送Direct模式的消息
    @Test
    public void testDirectMessage() {
        String exchange = "exchange-direct";
        String routingKey = "fan" ;              //路由键
        String message = "你看这牢饭又香又甜";
        //交换机名称,路由键,消息
        template.convertAndSend(exchange,routingKey,message);
    }

接收消息

    //监听Direct消息
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            name = "direct.queue1"
                    ),
                    exchange = @Exchange(
                            name = "exchange-direct",
                            type = ExchangeTypes.DIRECT
                    ),
                    key = {"fan"}
            )
    )
    public void recDirectMessage(String message) throws InterruptedException {
        System.out.println("消费1-获取基础消息:"+message);
        Thread.sleep(200);
    }

8.Topic

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

解释:

  • 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
  • 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

发送消息:

    //发送topic类型的消息
    @Test
    public void testTopicMessage() {
        String exchange = "exchange-topic";
        String routingKey = "china.news";
        String message = "吴签刑拘";
//        Map message = new HashMap<>();
//        message.put("id","123456");
//        message.put("name","lepeng");
        template.convertAndSend(exchange,routingKey,message);
    }

接收消息:

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            name = "topic-queue1"
                    ),
                    exchange = @Exchange(
                            name = "exchange-topic",
                            type = ExchangeTypes.TOPIC
                    ),
                    key = "*.news"
            )
    )
    //监听Topic类型消息
    public void recTopicMessage(String message) throws InterruptedException {
        System.out.println("消费1:"+message);
        Thread.sleep(200);
    }


注意:将消费者停止,去RabbitMQ的查看数据,可以看到我们传入的消息

经过base64解码我们可以看到

出现这种原因是因为:

Spring会把你发送的消息序列化,后进行一个base64的加密后放到队列当中。

默认情况下Spring采用的序列化方式是JDK序列化。JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

9.配置配置JSON转换器

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

并在启动类配置json转换的bean

@Bean
public MessageConverter jsonMessageConverter(){    
	return new Jackson2JsonMessageConverter();
}

测试:

监听器中修改参数为Map测试

10.源码

自行下载:https://gitee.com/le-peng/rabbit-mq-demo

以上是关于SpringAMQP处理RabbitMQ的常用五种消息模型的主要内容,如果未能解决你的问题,请参考以下文章

SpringAMQP整合RabbitMQ-五种工作模式Demo

SpringAMQP整合RabbitMQ-五种工作模式Demo

RabbitMQ

RabbitMQ

RabbitMQ学习笔记-p2(SpringAMQP)

RabbitMQ学习笔记-p2(SpringAMQP)