服务异步通信RabbitMQ

Posted youwhua

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了服务异步通信RabbitMQ相关的知识,希望对你有一定的参考价值。

服务异步通信

目录

1. 初识MQ

  1. 同步通讯
    直播、微服务间基于Feign的调用
    问题:耦合高、性能下降(调用耗时、等待响应(资源浪费)、吞吐量下降、服务阻塞、级联失败
  2. 异步通讯
    微信聊天
    事件驱动模式

    优势一:服务解耦
    优势二:性能提升,吞吐量提高
    优势三:服务没有强依赖,解决级联失败问题
    优势四:流量消峰
    问题一:依赖于Broker的可靠性、安全性、吞吐能力
    问题二:架构复杂了,业务没有明显的流水线,不好追踪管理
  3. 异步通讯框架
    MQ,存放消息的队列,也就是事件驱动架构中的Broker。

2. RabbitMQ快速入门

  1. RabbitMQ概述与安装
    基于Erlang语言开发的开源消息通信中间件
    使用docker安装
1.1拉取
docker pull rabbitmq:3-management
1.2导入后加载镜像
docker load -i mq.tar
2 启动MQ
docker run \\
 -e RABBITMQ_DEFAULT_USER=admin \\
 -e RABBITMQ_DEFAULT_PASS=admin \\
 --name mq \\
 --hostname mq1 \\ 
 -p 15672:15672 \\
 -p 5672:5672 \\
 -d \\
 rabbitmq:3-managment 

登录管理界面

  1. 常见消息模型
    publisher:消息发布者,将消息发送到队列queue
    queue:消息队列,负责接收并缓存消息
    consumer:订阅队列,处理队列中的消息
    subscribe:订阅
  • 基本消息队列 BasicQueue
    HelloWorld
  • 工作消息队列 WorkQueue
    可以提高消息处理速度,避免队列消息堆积

    消息预取机制:在处理消息之前,取出所有消息
    解决:每次只能获取一条消息,处理完成才能获取下一个消息
spring:
 rabbitmq:
  listener:
   simple:
    prefetch: 1
  • 发布订阅
    允许将同一消息发送给多个消费者。
    Exchange:只负责消息的转发
    广播 Fanout Exchange
    将收到的消息路由到每一个跟其绑定的queue

    publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String exchangeName = "name.fanout";
String message = "Hello World";
rabbitTemplate.convertAndSend(exchangeName, "", message);
......

consumer

@Configration
public class FanoutConfig 
 @Bean
 public FanoutExchange fanoutExchange()
  return new FanoutExchange("name.fanout");
 
 @Bean
 public Queue fanoutQueue1()
	  return new Queue("name.queue1");
 
 @Bean
 public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange)
	  return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
 
 @Bean
 public Queue fanoutQueue2()
	  return new Queue("name.queue2");
 
 @Bean
 public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange)
	  return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
 
 

路由 Direct Exchange
将接收到的消息根据规则路由到指定的Queue。

每个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
BindingKey可以相同

consumer

//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener 
 @RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称"), exchange = @Exchange(name = "交换机名称1", type = ExchangeTypes.DIRECT), key = "key1","key2"))
 public void listenerDirectQueue1(String msg)
  // 业务逻辑
 
 @RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.DIRECT), key = "key1","key2"))
 public void listenerDirectQueue2(String msg)
  // 业务逻辑
 

publisher

//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "key1";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......

主题 Topic Exchange
与DirectExchange类似,但Topic Exchange 的RoutingKey必须时多个单词列表,并且以 . 分割
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

consumer

//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener 
 @RabbitListener(bindings= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.TOPIC), key = "xxx.#"))
 public void listenerTopicQueue2(String msg)
  // 业务逻辑
 

publisher

//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "xxx.aaa";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......

  1. 快速入门
# publisher
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello World";
channel.basicPubulsh("", queueName, null, message.getBytes);
// 关闭通道、连接
channel.close();
connetion.close();
# consumer
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅消息
String message = "Hello World";
channel.basicPubulsh(queueName, true, new DefaultConsumer()channel
@Override
pubilc void handleDelivery(String consumerTag, Envelope envelpoe, AMQP.BasicProperties properties, byte[] body) throe IOException 
 // 处理消息
 
);
// 关闭通道、连接
channel.close();
connetion.close();

3. SpringAMQP

  • AMQP,应用程序之间传递消息的协议,与平台无关。

  • SpringAMQP,基于AMQP协议定义的一套API。spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

    监听器容器,用于异步处理入站消息
    用于发送和接收消息的RabbitTemplate
    RabbitAdmin用于自动声明队列,交换和绑定

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

  1. 引入依赖
<!-- 父工程中引入spring-amqp的依赖 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. publisher 发送消息
spring:
 rabbitmq:
  host: 127.0.0.1 # rabbitMQ ip地址
  port: 5672
  virtual-host: /
  username: admin
  password: admin
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;

......
String queueName = "queueName";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName, message);
......

  1. consumer 监听消息
spring:
 rabbitmq:
  host: 127.0.0.1 # 主机名
  port: 5672
  virtual-host: /
  username: admin
  password: admin
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener 
 @RabbitListener(queue = "队列名称")
 public void listenerSimpleQueue(String msg)
  // 业务逻辑
 

消息转换器
RabbitTemplate ,将message对象序列化为字节

修改序列化方式

父工程

<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>

publisher

@Configration
public class SpringRabbitListener 
 @Bean
 public MessageConverter messageConverter()
  return new Jackson2JsonMessageConverter();
 

最后

以上是学习 黑马程序员《微服务技术全栈教程》的学习笔记

以上是关于服务异步通信RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

服务异步通信RabbitMQ

微服务的异步通信技术RabbitMQ

Python开发项目:RPC异步执行命令(RabbitMQ双向通信)

RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)SpringAMQP发布订阅模式(FanoutExchangeDirectExchangeTopicExchange)消息转换器

熬夜整理的RabbitMQ知识点相当齐全的文章

熬夜整理的RabbitMQ知识点相当齐全的文章