SpringBoot整合RabbitMQ(源代码)

Posted ITdfq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ(源代码)相关的知识,希望对你有一定的参考价值。

SpringBoot整合RabbitMQ


1. 配置类:服务器配置、创建交换器、创建队列、创建绑定关系
2. 生产者:使用路由键发送消息(使用template)
3. 消费者:监听类(监听队列) 推模式


交换器有三种分别为 direct fanout topic

  1. direct传递的为Key=“red”
  2. fanout是广播,不考虑key全部发送
  3. topic绑定的类型为red.* 可以接受red.任意值 小数点代码分隔符 red.82 就只能接受red.82

配置类

package com.itdfq.rabbitmq.config;



import com.itdfq.rabbitmq.reveiver.Receiver;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



/**
 * @Author GocChin
 * @Date 2021/5/1 21:22
 * @Blog: itdfq.com
 * @QQ: 909256107
 * @Descript: 配置类
 */
@Configuration
public class RabbitConfig {
    @Value("${spring.rabbitmq.addresses}")
    private String address;
    @Value("${spring.rabbitmq.port}")
    private String port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Autowired
    private Receiver receiver;

    //连接工厂
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(address+":"+port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        //TODO  消息发送确认--回调
        connectionFactory.setPublisherConfirms(true);
        return  connectionFactory;

    }

    //RabbitAdmin类封装对RabbitMQ的管理操作
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){

        return new RabbitAdmin(connectionFactory);
    }
    //使用Template
    @Bean
    public RabbitTemplate newRabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        //设置监听确认mq(交换器)接受到信息
        rabbitTemplate.setConfirmCallback(confirmCallback());
        //添加监听 失败鉴定(路由没有收到)
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallback());
        return rabbitTemplate;
    }

    //声明交换器
    //Direct交换器
    @Bean
    public DirectExchange DirectExchange(){
        return new DirectExchange("DirectExchange");
    }
    //topic交换器
    @Bean
    public TopicExchange TopicExchange(){
        return new TopicExchange("TopicExchange");
    }
    //Fanout交换器
    @Bean
    public FanoutExchange FanoutExchange(){
        return new FanoutExchange("FanoutExchange");
    }
    //申明队列
    @Bean
    public Queue queue1(){
        return new Queue("queue1");
    }
    @Bean
    public Queue queue2(){
        return new Queue("queue2");
    }
    //绑定关系
    //queue1与direct绑定
    @Bean
    public Binding bindingQueue1Direct(){
        return BindingBuilder.bind(queue1())
                .to(DirectExchange())
                .with("red");
    }
    //queue2与direct绑定
    @Bean
    public Binding bindingQueue2Direct(){
        return BindingBuilder.bind(queue1())
                .to(DirectExchange())
                .with("while");
    }
    //queue1与fanout绑定
    @Bean
    public Binding bindingQueue1Fanout(){
        return BindingBuilder.bind(queue1())
                .to(FanoutExchange());

    }
    //queue2与fanout绑定
    @Bean
    public Binding bindingQueue2Fanout(){
        return BindingBuilder.bind(queue1())
                .to(FanoutExchange());
    }
    //queue1与Topic绑定
    @Bean
    public Binding bindingQueue1Topic(){
        return BindingBuilder.bind(queue1())
                .to(TopicExchange())
                .with("red.*");

    }
    //queue2与Topic绑定
    @Bean
    public Binding bindingQueue2Topic(){
        return BindingBuilder.bind(queue1())
                .to(TopicExchange())
                .with("white.82");
    }
    //****************生产者发送确认********************
    @Bean
    public RabbitTemplate.ConfirmCallback confirmCallback(){
        return new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (b){
                    System.out.println("发送者确认发送给mq成功");
                }else{
                    System.out.println("发送者发送失败,考虑重发"+s);
                }
            }
        };
    }
    //****************失败通知********************
    //失败才通知,成功不通知
    @Bean
    public RabbitTemplate.ReturnCallback returnCallback(){
        return new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String replayText, String exchange, String rountingKey) {
                System.out.println("无效路由信息,需要考虑另外处理");
                System.out.println("Returned replayText:"+replayText);
                System.out.println("Returned exchange:"+exchange);
                System.out.println("Returned rountingKey:"+rountingKey);
                String s = new String(message.getBody());
                System.out.println("Returned Message:"+s);
            }
        };
    }
    //****************消费者确认********************
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        //绑定队列
        container.setQueues(queue1());
        //手动提交
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //消费者确认方法
        container.setMessageListener(receiver);
        return container;
    }
}

生产者

package com.itdfq.rabbitmq.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;



/**
 * @Author GocChin
 * @Date 2021/5/1 22:04
 * @Blog: itdfq.com
 * @QQ: 909256107
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitProducter {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * direct
     */
    @GetMapping("/direct")
    public String direct(@RequestParam(required = true) String Key){//mq消息的发送  true表示必须传递参数
        String sendMsg = "key("+Key+"),exchange(direct)-"+System.currentTimeMillis(); //currentTimeMillis时间戳
        System.out.println("DirectSender"+sendMsg);
        this.rabbitTemplate.convertAndSend("DirectExchange",Key,sendMsg);
        return "发送direct消息成功";

    }
    @RequestMapping("/asd")
    public String index(){
        return "jieshaole";
    }
    /**
     * topic
     */
    @GetMapping("/topic")
    public String topic(@RequestParam(required = true) String Key){//mq消息的发送  true表示必须传递参数
        String sendMsg = "key("+Key+"),exchange(topic)-"+System.currentTimeMillis();
        System.out.println("TopicSender"+sendMsg);
        this.rabbitTemplate.convertAndSend("TopicExchange",Key,sendMsg);
        return "发送topic消息成功";

    }
    /**
     * fanout
     */
    @GetMapping("/fanout")
    public String fanout(@RequestParam(required = true) String Key){//mq消息的发送  true表示必须传递参数
        String sendMsg = "key("+Key+"),exchange(fanout)-"+System.currentTimeMillis();
        System.out.println("FanoutSender"+sendMsg);
        this.rabbitTemplate.convertAndSend("FanoutExchange",Key,sendMsg);
        return "发送fanout消息成功";

    }

}

消费者

  • 自动确认的消费者
package com.itdfq.rabbitmq.reveiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author GocChin
 * @Date 2021/5/1 22:26
 * @Blog: itdfq.com
 * @QQ: 909256107
 * @Desrcipt:监听类
 */
@Component
@RabbitListener(queues = "queue2") //监听的队列  自动确认
public class Consumer2 {

    @RabbitHandler  //根据这个注解进行执行方法
    public void process(String msg){
        System.out.println("Consumer1-Receiver:"+msg);
    }
}
  • 手动确认的消费者
package com.itdfq.rabbitmq.reveiver;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @Author GocChin
 * @Date 2021/5/1 23:56
 * @Blog: itdfq.com
 * @QQ: 909256107
 * @Decript: 消费queue1
 */
@Component
public class Receiver implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            String msg=new String(message.getBody());
            System.out.println("Receiver>>>>>>>>接收到消息:"+msg);
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                System.out.println("Receiver>>>>>>>>消息已消费");
            } catch (Exception e) {
                System.out.println(e.getMessage());
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                System.out.println("Receiver>>>>>>拒绝消息,要求MQ重新发送");
                e.printStackTrace();
                throw e;
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(e.getMessage());
        }

    }
  • 源代码

给博主点个Start谢谢

以上是关于SpringBoot整合RabbitMQ(源代码)的主要内容,如果未能解决你的问题,请参考以下文章

Springboot整合Rabbitmq(史上最详细)

springboot rabbitmq整合

SpringBoot整合RabbitMQ实现死信队列

RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例

SpringBoot整合RabbitMQ,实现消息发送和消费

SpringBoot系列5SpringBoot整合RabbitMQ