springboot集成rabbitmq:fanouttopic

Posted 做一道光

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot集成rabbitmq:fanouttopic相关的知识,希望对你有一定的参考价值。

编写Fanout模式的消息接收

其他模块和上文保持一致https://blog.csdn.net/weixin_59334478/article/details/127740411?spm=1001.2014.3001.5501

ReceiveServiceImpl实现类

package com.it.rabbitmq.impl;

import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService 
    //注入amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;


    @Override
    public void receiveMessage() 
        /**
         * 发送消息
         * 参数1为交换机名称
         * 参数2位RoutingKey
         * 参数3为具体发送的消息数据
         */
        String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
        System.out.println(message);
    

    /**
     * @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
     * 这个方法不需要手动调用,spring会自动监听
     * 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
     * @param message  参数就是接收到的具体消息数据
     */
    @RabbitListener(queues = "bootDirectQueue")
    public void directReceive(String message) 
        System.out.println("监听器接收的消息:"+message);
    

    @RabbitListener(bindings = 
                                 @QueueBinding(value = @Queue(),
                                                exchange = @Exchange(name="fanoutExchange",type = "fanout")
                                              ))
    public void fanoutReceive1(String message) 
        System.out.println("fanoutReceive1监听器接收的消息:"+message);
    


    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue(),
                    exchange = @Exchange(name="fanoutExchange",type = "fanout")
            ))
    public void fanoutReceive2(String message) 
        System.out.println("fanoutReceive2监听器接收的消息:"+message);
    


 

 

编写Fanout模式的消息接收

1.SendService接口

package com.it.rabbitmq;

public interface SendService 
    void sendMessage(String message);
    void sendFanoutMessage(String message);

SendServiceImpl实现类

package com.it.rabbitmq.impl;

import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String message) 
        /**
         * 发送消息
         * 参数1为交换机名称
         * 参数2位RoutingKey
         * 参数3为具体发送的消息数据
         */
        amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
    

    @Override
    public void sendFanoutMessage(String message) 
        amqpTemplate.convertAndSend("fanoutExchange","",message);
    

2.RabbitMQConfig类

package com.it.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig 

    //配置一个direct类型的交换机
    @Bean
    public DirectExchange directExchange() 
        return new DirectExchange("bootDirectExchange");

    

    //配置一个队列
    @Bean
    public Queue directQueue() 
        return new Queue("bootDirectQueue");
    

    /**
     * 配置一个交换机和队列的绑定
     *
     * @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
     * @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue, DirectExchange directExchange) 
        //完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
    

    //配置一个fanout类型的交换机
    @Bean
    public FanoutExchange fanoutExchange() 
        return new FanoutExchange("fanoutExchange");
    



3.运行主函数

package com.it;

import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class RabbitmqSpringbootApplication 

    public static void main(String[] args) 

        ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
        SendService sendService = (SendService) ac.getBean("sendService");
//        sendService.sendMessage("boot的测试数据");

        sendService.sendFanoutMessage("boot的fanout测试数据!");

    


 

 

 编写Topic模式消息接收

ReceiveServiceImpl实现类

package com.it.rabbitmq.impl;

import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService 
    //注入amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;


    @Override
    public void receiveMessage() 
        /**
         * 发送消息
         * 参数1为交换机名称
         * 参数2位RoutingKey
         * 参数3为具体发送的消息数据
         */
        String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
        System.out.println(message);
    

    /**
     * @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
     * 这个方法不需要手动调用,spring会自动监听
     * 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
     * @param message  参数就是接收到的具体消息数据
     */
    @RabbitListener(queues = "bootDirectQueue")
    public void directReceive(String message) 
        System.out.println("监听器接收的消息:"+message);
    

    @RabbitListener(bindings = 
                                 @QueueBinding(value = @Queue(),
                                                exchange = @Exchange(name="fanoutExchange",type = "fanout")
                                              ))
    public void fanoutReceive1(String message) 
        System.out.println("fanoutReceive1监听器接收的消息:"+message);
    


    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue(),
                    exchange = @Exchange(name="fanoutExchange",type = "fanout")
            ))
    public void fanoutReceive2(String message) 
        System.out.println("fanoutReceive2监听器接收的消息:"+message);
    

    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue("topic1"),
                          key = "aa",
                          exchange = @Exchange(name="topicExchange",type = "topic")
            ))
    public void topicReceive1(String message)
        System.out.println("topic1消费者---aa---"+message);
    

    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue("topic2"),
                    key = "aa.*",
                    exchange = @Exchange(name="topicExchange",type = "topic")
            ))
    public void topicReceive2(String message)
        System.out.println("topic2消费者---aa---"+message);
    

    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue("topic3"),
                    key = "aa.#",
                    exchange = @Exchange(name="topicExchange",type = "topic")
            ))
    public void topicReceive3(String message)
        System.out.println("topic3消费者---aa---"+message);
    



 

 

 编写Topic模式消息发送

1.SendService接口

package com.it.rabbitmq;

public interface SendService 
    void sendMessage(String message);
    void sendFanoutMessage(String message);
    void sendTopicMessage(String message);

SendServiceImpl类

package com.it.rabbitmq.impl;

import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String message) 
        /**
         * 发送消息
         * 参数1为交换机名称
         * 参数2位RoutingKey
         * 参数3为具体发送的消息数据
         */
        amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
    

    @Override
    public void sendFanoutMessage(String message) 
        amqpTemplate.convertAndSend("fanoutExchange","",message);
    

    @Override
    public void sendTopicMessage(String message) 
        amqpTemplate.convertAndSend("topicExchange","",message);
    


2.RabbitMQConfig类,提前声明一个topic的交换机

package com.it.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig 

    //配置一个direct类型的交换机
    @Bean
    public DirectExchange directExchange() 
        return new DirectExchange("bootDirectExchange");

    

    //配置一个队列
    @Bean
    public Queue directQueue() 
        return new Queue("bootDirectQueue");
    

    /**
     * 配置一个交换机和队列的绑定
     *
     * @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
     * @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue, DirectExchange directExchange) 
        //完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
    

    //配置一个fanout类型的交换机
    @Bean
    public FanoutExchange fanoutExchange() 
        return new FanoutExchange("fanoutExchange");
    

    //配置一个topic类型的交换机
    @Bean
    public TopicExchange topicExchange() 
        return new TopicExchange("topicExchange");
    


3.运行主函数

package com.it;

import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class RabbitmqSpringbootApplication 

    public static void main(String[] args) 

        ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
        SendService sendService = (SendService) ac.getBean("sendService");
//        sendService.sendMessage("boot的测试数据");

//        sendService.sendFanoutMessage("boot的fanout测试数据!");

        sendService.sendTopicMessage("boot的topic测试数据,key:aa");
    


功能测试:

查看接收类

修改发送信息的routingkey

 

 修改发送信息的routingkey

 

 

以上是关于springboot集成rabbitmq:fanouttopic的主要内容,如果未能解决你的问题,请参考以下文章

什么是持续集成?

SYS.4系统集成及集成测试

Nodejs应用集成TS

模型集成(Ensemble)

持续集成的好处?

集成测试入门笔记