AMQP协议和rabbitMQ

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AMQP协议和rabbitMQ相关的知识,希望对你有一定的参考价值。

参考技术A

AMQP,即 高级消息队列协议 (Advanced Message Queuing Protocol),是一个消息中间件应用层协议,用于组件之间的解耦,来提供 统一消息服务。主要功能是 排序消息,路由消息(包括点对点和订阅-发布),保证消息的可靠性和安全性。
遵循AMPQ协议的客户端,都能通过 消息中间件 相互通信。这样 客户端 就可以采用不同的开发语言实现,彼此无强依赖关系,降低客户端复杂性,提高开发效率也利于后期维护。

AMQP 的模型架构如下:

rabbitMQ是AMQP协议的一个开源实现。架构模型同样可以用以下的图来表示:

如上图,simple模式,单个publisher,单个queue,单个consumer

如上图,work模式
多个consumer共用一个queue的message
此种模式下,rabbitMQ会自动做负载均衡,将消息轮询发送给各个消费者,即一个消息只能被一个消费者获取

如上图,publish / subscribe 发布订阅模式(广播模式)
相对前2种模式,多了一个 exchange (type为fanout) ,message先发送到exchange,exchange再分别发送到对应的所有queue。而consumer订阅自己的queue,在自己订阅的queue上消费message。

示例应用场景,如下图示:

比如 网上购物,下单支付成功后,通知用户的方式有许多种,app推送,短信,email 等等。
message到来后被exchange发送到3个queue(app推送q,短信q,email_q)
之后 app推送服务,短信通知服务,email通知服务 从各自订阅的queue获取消息,通知用户支付成功

如上图示,exchange类型设定为direct
此时 message中的rountingKey 和 exchange中的bindingKey匹配,两者相等则发送对应的queue中,如果匹配不到bindingKey,则丢弃该message。

示例应用场景,如下图示:

比如服务产生的日志,日志有许多类型,error,info,debuf等类型的日志,而我们的需求只想要将 error 类型的日志写入磁盘,就可以用routing模式,将error日志路由到error queue,再由相应的 写入磁盘服务获取message,写入磁盘

如上图示,exchange类型为topic,相对于第4种模式,相同点是都根据 rountingKey 匹配,不同点是 topic 模式支持模糊匹配。

MQ-死信队列实现消息延迟

死信队列实现消息延迟

一、延迟队列

延迟队列:消息进入到队列之后,延迟指定的时间才能被消费者消费。

AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能。

TTL就是消息的存活时间,RabbitMQ可以分别对队列和消息设置存活时间。

  • 在创建队列的时候可以设置队列的存活时间,消息进入队列后,在存活时间内没有被消费者消费,则此消息会从当前队列移除。
  • 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除。
  • 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列。

二、使用延迟队列实现订单支付监控

  • 实现流程图如图:

  • 创建路由交换机

  • 创建消息队列

  • 创建死信队列

  • 队列绑定

  • 发送消息到交换机delay_exchange的k1(即消息队列delay_queue1)

    //普通maven项目演示
    //发送消息
    public class SendMsg 
    
        public static void main(String[] args) 
            System.out.println("请输入消息:");
            Scanner input = new Scanner(System.in);
            String msg = input.nextLine();
    
            Connection connection = null;
    
            try 
                //获取连接,相当于JDBC的获取数据库连接
                connection = MQUtil.getConnection();
     
                Channel channel = connection.createChannel();
    
                //发消息之前开启消息确认
                channel.confirmSelect();
    
                channel.basicPublish("delay_exchange","k1",null,msg.getBytes());
    
                //消息发送之后等待消息反馈
                try 
                    boolean b = channel.waitForConfirms();
                    System.out.println("发送--->" + msg + (b ? "成功": "失败"));
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
    
                //关闭
                channel.close();
                connection.close();
    
             catch (IOException e) 
                e.printStackTrace();
             catch (TimeoutException e) 
                e.printStackTrace();
            
    
    

    发送消息:


此时开启接收队列delay_queue2(而不是delay_queue1)的消息:会发现不能实时接收,需要等到delay_queue1的TTL时间到后才能成功接收到消息。

//普通maven项目演示
//接受消息
public class ReceiveMsg 

    public static void main(String[] args) 

        Connection connection = null;

        //获取连接,相当于JDBC的获取数据库连接
        try 
            connection = MQUtil.getConnection();
            Channel channel = connection.createChannel();

            //声明要关注的队列
            //channel.queueDeclare("queue1", false, false, false, null);
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) 
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer2消费消息:'" + message + "'");
                
            ;
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume("delay_queue2", true, consumer);

         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    

由于在前面创建死信队列设置的delay_queue1的TTL时间为10s,因此间隔10s后成功接收到消息:


演示完毕!

以上是关于AMQP协议和rabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

RIP协议和OSPF协议

Http协议和Https协议

Http协议和Https协议

BLE GAP 协议和 GATT 协议

HTTP协议和HTTPS协议

RIP协议和OSPF协议