springboot-rabbitmq
Posted 丶落幕
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot-rabbitmq相关的知识,希望对你有一定的参考价值。
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
springboot-rabbitmq
使用步骤
1.引入依赖
核心依赖(示例):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置yml
代码如下(示例):
server:
port: 9001
spring:
rabbitmq:
host: 192.168.249.130
port: 5672
username: admin
password: admin
virtual-host: /
3.声明交换机,队列,并绑定关系
第一种:通过config(示例):
@Configuration
public class RabbitmqConfig {
//1.声明注册fanout模式的交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout-order-exchange",true,false);
}
//2.声明队列 sms.fanout.queue email.fanout.queue duanxin.fanout.queue
@Bean
public Queue smsQueue(){
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue emailQueue(){
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue duanxinQueue(){
return new Queue("duanxin.fanout.queue", true);
}
//3.完成绑定关系
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding duanxinBinding(){
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
}
第二种:通过注解绑定关系
注意:这里只是绑定关系,并没有声明交换机,队列(推荐使用config)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.fanout.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "fanout-order-exchange",type = ExchangeTypes.FANOUT),
key = ""
))
@Service
public class FanoutEmailConsumer {
@RabbitHandler
public void getMessage(String message){
System.out.println("email fanout----接收到的订单信息是:"+message);
}
}
4.业务类(provider=“发送消息”)
代码如下(示例):
@Service
public class OrderService {
@Autowired
RabbitTemplate rabbitTemplate;
public void makeOrder(){
//1.根据商品id查询库存是否充足
//2.保存订单
String orderId=UUID.randomUUID().toString();
System.out.println("订单生成成功:"+orderId);
//3.通过mq来完成消息的分发
//参数1.交换机 参数2.路由key/队列名 参数3.消息内容
String exchangeName = "fanout-order-exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
}
5.业务类(consumer=“消费消息”)
代码如下(示例):
@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class FanoutEmailConsumer {
@RabbitHandler
public void getMessage(String message){
System.out.println("email fanout----接收到的订单信息是:"+message);
}
}
高级特性
1.TTL(设置过期时间)
x-message-ttl(示例):
@Bean
public Queue emailQueue(){
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl", 5000);//这里一定是一个int类型
return new Queue("email.fanout.queue", true,false,false,args);
}
设置单条消息过期时间(示例):
@Service
public class OrderService {
@Autowired
RabbitTemplate rabbitTemplate;
public void makeOrder(){
//1.根据商品id查询库存是否充足
//2.保存订单
String orderId=UUID.randomUUID().toString();
System.out.println("订单生成成功:"+orderId);
//3.通过mq来完成消息的分发
//参数1.交换机 参数2.路由key/队列名 参数3.消息内容
String exchangeName = "fanout-order-exchange";
String routingKey = "";
MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
}
}
TTL队列消息过期会被转移到死信队列中
而单独设置的过期消息则会被直接移除
2.DLX(死信队列)
x-dead-letter-exchange(死信交换机)
x-dead-letter-routing-key(死信routingkey)(示例):
@Bean
public Queue emailQueue(){
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl", 5000);//这里一定是一个int类型
args.put("x-dead-letter-exchange", "dead-direct-exchange");//绑定死信交换机
args.put("x-dead-letter-routing-key", "dead");//绑定死信交换机路由key(也就是DLK)
return new Queue("email.fanout.queue", true,false,false,args);
}
3.LIM(最大队列长度)
x-max-length(示例):
@Bean
public Queue emailQueue(){
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl", 5000);//这里一定是一个int类型
args.put("x-max-length", 5);
args.put("x-dead-letter-exchange", "dead-direct-exchange");//绑定死信交换机
args.put("x-dead-letter-routing-key", "dead");//绑定死信交换机路由key(也就是DLK)
return new Queue("email.fanout.queue", true,false,false,args);
}
死信交换机其实也是一个普通的交换机,类型可以按需设置
当当前交换机消息过期时,会被转入死信交换机,并发送给死信队列
单机集群搭建
搭建集群(示例):
#启动第一个节点rabbit-1
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
#启动第一个节点rabbit-2
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
#停止应用
sudo rabbitmqctl -n rabbit-1 stop_app
#目的是清楚节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
#启动应用
sudo rabbitmqctl -n rabbit-1 start_app
#重复步骤
sudo rabbitmqctl -n rabbit-2 stop_app
sudo rabbitmqctl -n rabbit-2 reset
#将rabbit-2作为从节点连接rabbit-1 @后面为服务器的主机名[root@localhost ~]
sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@localhost
#启动rabbit-2
sudo rabbitmqctl -n rabbit-2 start_app
#查看集群信息
sudo rabbitmqctl cluster_status -n rabbit-1
开启web监控(示例):
#开启web查件
rabbitmq-plugins enable rabbitmq_management
#创建用户
rabbitmqctl -n rabbit-1 add_user admin admin
#赋予身份
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
#赋予权限
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
#重复rabbit-2(这里显示已经存在,可能集群user共通的吧)
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
总结
无论发送消息还收消费消息,都需要连接rabbitmq
声明交换机,队列,绑定关系,在provider或者consumer都是可以的
这里的示例是fanout模式 FanoutExchange
direct模式则是 DirectExchange(绑定队列得加上路由key)
示例: BindingBuilder.bind(duanxinQueue()).to(directExchange()).with(“duanxin”);
以上是关于springboot-rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章