rabbitMq与spring boot搭配实现监听

Posted 只为成功找方法,不为失败找借口

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMq与spring boot搭配实现监听相关的知识,希望对你有一定的参考价值。

  在我前面有一篇博客说到了rabbitMq实现与zk类似的watch功能,但是那一篇博客没有代码实例,后面自己补了一个demo,便于理解。demo中主要利用spring boot的配置方式,

一、消费者(也就是watcher)配置

配置都采用spring的注解进行配置

1、创建连接

  @Bean
    public ConnectionFactory createConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
     //设置rabbitMq的ip和端口 connectionFactory.setAddresses(
"127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; }

2、创建交换机

    @Bean
    public Exchange fanoutExchange() {
        return new FanoutExchange("ex_rabbit_test");
    }

创建一个名为ex_rabbit_test的交换机,交换机的类型为广播类型(为了实现消息的广播)

3、创建队列,并绑定到交换机上

    @Bean
    public Queue queueOne() {
        return new Queue("queue_one", false, false, true);
    }

    @Bean
    public Binding bindingOne(Queue queueOne, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueOne)
                .to(fanoutExchange);
    }

每一个消费者有自己的队列,只消费自己队列的消息;将队列和交换机绑定之后,交换机会将生产者发出的消息放到所有绑定的队列中,但是仅限广播模式,其它模式会按照一定的路由规则进行消息路由,比如topic类型的交换机会按照routingKey路由消息。

注意:在广播模式中,为了实现消息监听,每个消费者需要各自起一个队列,而且队列名不相同,比如现在有另外一个消费者:

    @Bean
    public Queue queueTwo() {
        return new Queue("queue_two", false, false, true);
    }

    @Bean
    public Binding BingdingTwo(Queue queueTwo, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueTwo)
                .to(fanoutExchange);
    }

如此一来,当生产者将消息发到交换机ex_rabbit_test中时,交换机就将消息发给queue_one和queue_two两个队列中,两个消费者分别取两个队列的消息进行消费。

4、消费消息

    @Bean
    public SimpleMessageListenerContainer execMessageContainerOne() {
     //设置监听者“容器” SimpleMessageListenerContainer container
= new SimpleMessageListenerContainer(createConnectionFactory());
     //设置队列名 container.setQueueNames(
"queue_one");
     //设置监听者数量,即消费线程数 container.setConcurrentConsumers(
1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer one"; consumerService.doProcess(usr, msg);//消费消息 } catch(Exception e) { e.printStackTrace(); } } }); return container; } @Bean public SimpleMessageListenerContainer execMessageContainerTwo() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory()); container.setQueueNames("queue_two"); container.setConcurrentConsumers(1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) ->{ byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer two"; consumerService.doProcess(usr, msg);//消费消息 } catch (Exception e) { e.printStackTrace(); } } }); return container; }

consumerService提供消费消息的服务,执行如下方法

    public void doProcess(String usr, String msg) {
        System.out.println(usr + " receive message from producer:" + msg);
    }

二、生产者配置

1、与消费者相同的方式建立rabbitMq的连接

2、与消费者相同的方式设置交换机,交换机名称也为ex_rabbit_test(如果rabbitmq中已经存在这个交换机,可以不用创建)

3、关于是否建立队列以及将队列与交换机绑定,我的理解是这样的:

  如果在生产者的代码里面建立队列并将其与交换机绑定,那么就必须建立所有的消费者的队列,并将所有队列与交换机绑定,如果这样做,消费者中就可以省掉这个配置。事实上,这样做是有点得不偿失的,我不赞同这样做,这里只是说明这样做也可以达到目的。

4、创建rabbit模板(org.springframework.amqp.rabbit.core.RabbitTemplate)

    @Bean
    public RabbitTemplate rabbitTemplateProducer() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(this.createConnectionFactory());
        rabbitTemplate.setExchange("ex_rabbit_test");
        return rabbitTemplate;
    }

5、实现消息发送

  demo中使用spring web的方式启动消息发送,下面是controller和service的代码

@Controller
@RequestMapping(value="/index")
public class ProducerController {

    @Autowired
    private ProducerService producerService;

    @RequestMapping(value = "/send")
    @ResponseBody
    public String sendMsg(@RequestParam String msg) {
        producerService.send(msg);
        return "Success";
    }
}
@Service
public class ProducerService {

    @Resource(name = "rabbitTemplateProducer")
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        String message = "Hello, consumer.Sending:" + msg;
        rabbitTemplate.convertAndSend(message);
    }
}

三、pom文件

在consumer中只需要引入spring ampq的依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>
    </dependencies>

在prudocer中需要引入spring ampq的依赖,另外由于是启动了web 项目,所以需要spring web的依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>
    </dependencies>

四、启动项目和测试结果

使用spring boot可以快速启动项目,首先,在8882端口上启动producer,然后启动consumer。通过在controller中定义的访问地址http://localhost:8882/index/send?msg=hello everybody(此处的msg必须有,因为@RequestParam注解),可以看到两个消费者都消费了这条消息

Consumer one receive message from producer:Hello, consumer.Sending:hello everybody
Consumer two receive message from producer:Hello, consumer.Sending:hello everybody

从rabbitMq的后台(http://localhost:15672  usrname:guest  pasword:guest)可以看到刚才创建的交换机和队列。

当消费者变多,或者为了代码的统一管理,每个消费者的代码需要相同,为了实现广播需求,需要为每个消费者设置不同的队列名。这种情况下,可以采用UUID的方式,每个消费者可以创建一个唯一的随机队列名。UUID方式创建队列名的代码可以在ampq的jar包中找到org.springframework.amqp.core.AnonymousQueue

     public String generateName() {
            UUID uuid = UUID.randomUUID();
            ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
            bb.putLong(uuid.getMostSignificantBits())
              .putLong(uuid.getLeastSignificantBits());
            // TODO: when Spring 4.2.4 is the minimum Spring Framework version, use encodeToUrlSafeString() SPR-13784.
            return this.prefix + Base64Utils.encodeToString(bb.array())
                                    .replaceAll("\\+", "-")
                                    .replaceAll("/", "_")
                                    // but this will remain
                                    .replaceAll("=", "");
        }

可以将UUID方法的返回值加在固定队列名的后面,这样就生成了一个唯一的随机队列名。关于UUID的描述可以自行百度。

ps:前段时间看了spring cloud,看到其中的一个工具,spring cloud bus也可以用作消息监听,细察之后发现,spring cloud bus也是封装了rabbitMq,实现了消息队列。





以上是关于rabbitMq与spring boot搭配实现监听的主要内容,如果未能解决你的问题,请参考以下文章

[Spring boot] Spring boot 整合RabbitMQ实现通过RabbitMQ进行项目的连接

Spring Boot (十三): Spring Boot 整合 RabbitMQ

Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列

详细介绍Spring Boot + RabbitMQ实现延迟队列

java——spring boot集成RabbitMQ——路由模式——实现生产者

spring boot Rabbitmq集成,延时消息队列实现