RabbitMQ学习之旅

Posted lurker-yaojiang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习之旅相关的知识,希望对你有一定的参考价值。

RabbitMQ学习总结(一)

RabbitMQ简介

RabbitMQ是一个消息代理,其接收并转发消息。类似于现实生活中的邮局:你把信件投入邮箱的过程,相当于往队列中添加信息,因为所有邮箱中的信件最终都会汇集到邮局中;当邮递员把你的新建发送给收件人的时候,相当于消息的转发。

RabbitMQ中的常见术语

  • 生产者(Provider):生产者负责生产消息,并将其发送到消息队列中
  • 队列(Queue):消息代理(Proxy)角色,从生产者那里接收消息,并将其转发到消费者进行消费。队列主要受限于主机的内存和磁盘的大小,其本质是一个消息缓冲区
  • 消费者(Consumer):消费者从队列中接收消息,并对消息进行处理
  • 交换机(Exchange):交换机位于生产者(Provider)和队列(Queue)之间,相当于路由(Routing),服务器会根据路由键(RoutingKey)将消息经由交换机路由到对应的队列(Queue)上去
  • 信道(Channel):信道是生产者与消费者和队列进行通信的渠道,其是建立在TCP连接上的虚拟连接;RabbitMQ在一条TCP连接上建立成千上百条信道来达到多线程处理,这个TCP被多个线程共享,每个线程对应一个信道,每个信道对应唯一的ID,保障信道私有性

RabbitMQ简单使用

STEP1:准备`Maven`依赖

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>RELEASE</version>
</dependency>

STEP2:生产者

public class Provider {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("username");
        factory.setPassword("password");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个信道
        Channel channel = connection.createChannel();    
        // 指定一个队列
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        // 参数1 queue :队列名
        // 参数2 durable :是否持久化
        // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
        // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
        // 参数5 arguments
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        // 发送消息
        String message = "Hello World!";
        // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        // 参数1 exchange :交换器
        // 参数2 routingKey : 路由键
        // 参数3 props : 消息的其他参数
        // 参数4 body : 消息体
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent ‘" + message + "‘");
        // 关闭频道和连接  
        channel.close();
        connection.close();
    }
}

注意:声明队列是幂等的,代表着只有在队列不存在的时候才会被创建,多次声明并不会重复创建。

STEP3:消费者

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("username");
        factory.setPassword("password");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 指定一个队列
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        // 参数1 queue :队列名
        // 参数2 durable :是否持久化
        // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
        // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
        // 参数5 arguments
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 创建队列消费者,此处是一个回调函数,当消费者接收到来自消息队列的消息时,会调用该接口中被重写的方法
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
 throws IOException 
{
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received ‘" + message + "‘");
            }
        };
        // basicConsume(String queue, boolean autoAck, Consumer callback)
        // 参数1 queue :队列名
        // 参数2 autoAck : 是否自动ACK;为true的时候表示自动回复,为false的时候表示手动回复;自动回复的含义是,当消费
        // 者接收到消息的时候,就自动认定该消息已经被消费;手动回复的含义是,不进行自动回复,只有接收到方法发送到的确认信
        // 息,才确认该消息已经被消费
        // 参数3 callback : 消费者对象的一个接口,用来配置回调
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}




















































































以上是关于RabbitMQ学习之旅的主要内容,如果未能解决你的问题,请参考以下文章

我的RabbitMQ学习之旅3 (发布/订阅)

我的C语言学习进阶之旅解决 Visual Studio 2019 报错:错误 C4996 ‘fscanf‘: This function or variable may be unsafe.(代码片段

我的C语言学习进阶之旅解决 Visual Studio 2019 报错:错误 C4996 ‘fscanf‘: This function or variable may be unsafe.(代码片段

我的OpenGL学习进阶之旅NDK开发中find_library查找的系统动态库在哪里?

我的OpenGL学习进阶之旅NDK开发中find_library查找的系统动态库在哪里?

Unity Shader入门精要学习笔记 - 第5章 开始 Unity Shader 学习之旅