SpringBoot整合RabbitMQ(源代码)
Posted ITdfq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ(源代码)相关的知识,希望对你有一定的参考价值。
SpringBoot整合RabbitMQ
1. 配置类:服务器配置、创建交换器、创建队列、创建绑定关系
2. 生产者:使用路由键发送消息(使用template)
3. 消费者:监听类(监听队列) 推模式
交换器有三种分别为 direct fanout topic
- direct传递的为Key=“red”
- fanout是广播,不考虑key全部发送
- topic绑定的类型为red.* 可以接受red.任意值 小数点代码分隔符 red.82 就只能接受red.82
配置类
package com.itdfq.rabbitmq.config;
import com.itdfq.rabbitmq.reveiver.Receiver;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author GocChin
* @Date 2021/5/1 21:22
* @Blog: itdfq.com
* @QQ: 909256107
* @Descript: 配置类
*/
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.addresses}")
private String address;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Autowired
private Receiver receiver;
//连接工厂
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(address+":"+port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
//TODO 消息发送确认--回调
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
//RabbitAdmin类封装对RabbitMQ的管理操作
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
//使用Template
@Bean
public RabbitTemplate newRabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
//设置监听确认mq(交换器)接受到信息
rabbitTemplate.setConfirmCallback(confirmCallback());
//添加监听 失败鉴定(路由没有收到)
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallback());
return rabbitTemplate;
}
//声明交换器
//Direct交换器
@Bean
public DirectExchange DirectExchange(){
return new DirectExchange("DirectExchange");
}
//topic交换器
@Bean
public TopicExchange TopicExchange(){
return new TopicExchange("TopicExchange");
}
//Fanout交换器
@Bean
public FanoutExchange FanoutExchange(){
return new FanoutExchange("FanoutExchange");
}
//申明队列
@Bean
public Queue queue1(){
return new Queue("queue1");
}
@Bean
public Queue queue2(){
return new Queue("queue2");
}
//绑定关系
//queue1与direct绑定
@Bean
public Binding bindingQueue1Direct(){
return BindingBuilder.bind(queue1())
.to(DirectExchange())
.with("red");
}
//queue2与direct绑定
@Bean
public Binding bindingQueue2Direct(){
return BindingBuilder.bind(queue1())
.to(DirectExchange())
.with("while");
}
//queue1与fanout绑定
@Bean
public Binding bindingQueue1Fanout(){
return BindingBuilder.bind(queue1())
.to(FanoutExchange());
}
//queue2与fanout绑定
@Bean
public Binding bindingQueue2Fanout(){
return BindingBuilder.bind(queue1())
.to(FanoutExchange());
}
//queue1与Topic绑定
@Bean
public Binding bindingQueue1Topic(){
return BindingBuilder.bind(queue1())
.to(TopicExchange())
.with("red.*");
}
//queue2与Topic绑定
@Bean
public Binding bindingQueue2Topic(){
return BindingBuilder.bind(queue1())
.to(TopicExchange())
.with("white.82");
}
//****************生产者发送确认********************
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback(){
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("发送者确认发送给mq成功");
}else{
System.out.println("发送者发送失败,考虑重发"+s);
}
}
};
}
//****************失败通知********************
//失败才通知,成功不通知
@Bean
public RabbitTemplate.ReturnCallback returnCallback(){
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String replayText, String exchange, String rountingKey) {
System.out.println("无效路由信息,需要考虑另外处理");
System.out.println("Returned replayText:"+replayText);
System.out.println("Returned exchange:"+exchange);
System.out.println("Returned rountingKey:"+rountingKey);
String s = new String(message.getBody());
System.out.println("Returned Message:"+s);
}
};
}
//****************消费者确认********************
@Bean
public SimpleMessageListenerContainer messageListenerContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
//绑定队列
container.setQueues(queue1());
//手动提交
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//消费者确认方法
container.setMessageListener(receiver);
return container;
}
}
生产者
package com.itdfq.rabbitmq.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author GocChin
* @Date 2021/5/1 22:04
* @Blog: itdfq.com
* @QQ: 909256107
*/
@RestController
@RequestMapping("/rabbit")
public class RabbitProducter {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* direct
*/
@GetMapping("/direct")
public String direct(@RequestParam(required = true) String Key){//mq消息的发送 true表示必须传递参数
String sendMsg = "key("+Key+"),exchange(direct)-"+System.currentTimeMillis(); //currentTimeMillis时间戳
System.out.println("DirectSender"+sendMsg);
this.rabbitTemplate.convertAndSend("DirectExchange",Key,sendMsg);
return "发送direct消息成功";
}
@RequestMapping("/asd")
public String index(){
return "jieshaole";
}
/**
* topic
*/
@GetMapping("/topic")
public String topic(@RequestParam(required = true) String Key){//mq消息的发送 true表示必须传递参数
String sendMsg = "key("+Key+"),exchange(topic)-"+System.currentTimeMillis();
System.out.println("TopicSender"+sendMsg);
this.rabbitTemplate.convertAndSend("TopicExchange",Key,sendMsg);
return "发送topic消息成功";
}
/**
* fanout
*/
@GetMapping("/fanout")
public String fanout(@RequestParam(required = true) String Key){//mq消息的发送 true表示必须传递参数
String sendMsg = "key("+Key+"),exchange(fanout)-"+System.currentTimeMillis();
System.out.println("FanoutSender"+sendMsg);
this.rabbitTemplate.convertAndSend("FanoutExchange",Key,sendMsg);
return "发送fanout消息成功";
}
}
消费者
- 自动确认的消费者
package com.itdfq.rabbitmq.reveiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author GocChin
* @Date 2021/5/1 22:26
* @Blog: itdfq.com
* @QQ: 909256107
* @Desrcipt:监听类
*/
@Component
@RabbitListener(queues = "queue2") //监听的队列 自动确认
public class Consumer2 {
@RabbitHandler //根据这个注解进行执行方法
public void process(String msg){
System.out.println("Consumer1-Receiver:"+msg);
}
}
- 手动确认的消费者
package com.itdfq.rabbitmq.reveiver;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Author GocChin
* @Date 2021/5/1 23:56
* @Blog: itdfq.com
* @QQ: 909256107
* @Decript: 消费queue1
*/
@Component
public class Receiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String msg=new String(message.getBody());
System.out.println("Receiver>>>>>>>>接收到消息:"+msg);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("Receiver>>>>>>>>消息已消费");
} catch (Exception e) {
System.out.println(e.getMessage());
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
System.out.println("Receiver>>>>>>拒绝消息,要求MQ重新发送");
e.printStackTrace();
throw e;
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}
- 源代码
以上是关于SpringBoot整合RabbitMQ(源代码)的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例