消息队列:RabbitMQ安装和快速入门

Posted 花_城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列:RabbitMQ安装和快速入门相关的知识,希望对你有一定的参考价值。

文章目录

一、消息队列

在计算机中,消息是一个程序发送给另一个程序的数据,而队列(一种数据结构)就是一个容器,存放在里面的东西,都是先放进来的先被拿出去。所以,消息队列就是在消息的传输过程中暂时保存消息的容器。使用队列,能够避免在收、发消息需要同步进行的弊端,可以让发送方直接将消息放如队列,避免因为等待接收方而阻塞。

1.1 作用

  • 应用解耦:

    把一个大系统拆分为多个小系统,互相之间用消息队列来做数据的交互或函数的调用。

  • 流量削峰:

    在流量特别大的应用场景内,比如秒杀活动。这种情况下,服务端很容易崩溃。但是我直接拒绝用户,对用户来说是一种极为不好的用户体验。所以,我们可以控制处理请求的速度,将暂时处理不了的请求放入消息队列,让用户稍等一会儿,这比直接拒绝好很多。

  • 消息分发:

    就是将数据发送给需要接收数据的其他主机、程序等

  • 异步处理:

    比如,在用户成功购买某一件商品后,同时向他发送订单的手机短信和电子邮件,然后跳转到成功页面:

    • 没有消息队列:先将订单数据交给短信模块发送手机短信,然后交给邮件模块发送电子邮件,完成后跳转到成功页面。
    • 使用消息队列:直接跳转到成功页面,至于手机短信和电子邮件,可以把订单数据放入消息队列中,短信模块和邮件模块在后台异步读取数据并发送。

1.2 主流消息队列比较

KafkaRocketMQRabbitMQ
单机吞吐量十万级十万级万级
消息延迟毫秒级毫秒级微秒级
可用性非常高(分布式)非常高(分布式)高(主从)
消息丢失理论上不会丢失理论上不会丢失
社区活跃度

二、RabbitMQ的安装

2.1 安装

2.1.1 Docker 方式

# for RabbitMQ 3.9, the latest series
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

# for RabbitMQ 3.8,
# 3.8.x support timeline: https://www.rabbitmq.com/versions.html
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management

2.1.2 原生方式(Ubuntu 20.04)

  1. 将下面的代码,保存到rabbitMQ_install.sh文件中:

    #!/usr/bin/sh
    
    sudo apt-get install curl gnupg apt-transport-https -y
    
    ## Team RabbitMQ's main signing key
    curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
    ## Cloudsmith: modern Erlang repository
    curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
    ## Cloudsmith: RabbitMQ repository
    curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null
    
    ## Add apt repositories maintained by Team RabbitMQ
    sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
    ## Provides modern Erlang/OTP releases
    ##
    deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main
    deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main
    
    ## Provides RabbitMQ
    ##
    deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main
    deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main
    EOF
    
    ## Update package indices
    sudo apt-get update -y
    
    ## Install Erlang packages
    sudo apt-get install -y erlang-base \\
                            erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \\
                            erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \\
                            erlang-runtime-tools erlang-snmp erlang-ssl \\
                            erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
    
    ## Install rabbitmq-server and its dependencies
    sudo apt-get install rabbitmq-server -y --fix-missing
    
  2. 给文件添加执行权限:

    sudo chmod +x ./rabbitMQ_install.sh
    
  3. 执行脚本:

    sudo bash ./rabbitMQ_install.sh
    

2.2 管理插件的用法

  1. 启动 RabbitMQ服务:

    sudo systemctl start rabbitmq-server
    
  2. 创建 RabbitMQ用户:

    sudo rabbitmqctl add_user '用户名' '密码'
    

    其实,RabbitMQ自带了一个来宾用户,用户名和密码都是guest

  3. 授予管理员权限:

    sudo rabbitmqctl set_user_tags 用户名 administrator 
    
  4. 向虚拟主机中的用户授予所有权限:

    sudo rabbitmqctl set_permissions -p / 用户名 '.*' '.*' '.*'
    
  5. 启用管理插件:

    sudo rabbitmq-plugins enable rabbitmq_management
    
  6. 访问管理页面:http://localhost:15672/,输入前面创建的用户名和密码即可。

三、RabbitMQ快速入门

3.1 名词介绍

  • 生产(Producing):即发送消息,发送消息的程序被称为“生产者( producer)”。

  • 消费(Consuming):即接收消息,接收消息的程序被称为“消费者(consumer)

  • 队列(Queue):就是存放消息的地方。队列只受主机的内存和磁盘限制,它本质上是一个大的消息缓冲区。

    可以有多个生产者将消息发送到一个队列,也可以有多个消费者尝试从一个队列接收数据。

注意:生产者、消费者、消息队列可以放在不同的机器上;生产者和消费者也可以是同一个程序

3.2 Hello World!

接下来,我们用 python 代码编写两个小程序,生产者发送消息(将消息放入队列),消费者接收消息(从队列取出消息),消息内容为“Hello World!”。这是一个最精简的 RabbitMQ 的使用过程。

  1. 安装 RabbitMQ 官方推荐的 python 客户端:

    pip install pika
    
  2. 发送消息,新建 send.py 文件:

    #!/usr/bin/env python
    import pika
    
    # 连接到本地的消息队列
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # 创建队列,队列名称为 hello
    channel.queue_declare(queue='hello')
    
    # 发送消息
    channel.basic_publish(exchange='',
                          routing_key='hello',  # 队列名称
                          body='Hello World!')  # 消息内容
    print(" [x] 发送 'Hello World!'")
    
    # 关闭连接
    connection.close()    # 会自动刷新网络缓存区,确保消息被发送
    
  3. 接收消息,新建 receive.py 文件:

    #!/usr/bin/env python
    import pika, sys, os
    
    
    def main():
        # 连接到本地的消息队列
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
    
        # 创建队列,队列名称为 hello
        channel.queue_declare(queue='hello')
    
        # 回调函数,每当接收到消息时被调用
        def callback(ch, method, properties, body):
            print(" [x] 接收到 %r" % body)
    
        channel.basic_consume(queue='hello',    # 指定接受消息的队列
                              auto_ack=True,    # 是否自动调用回调函数
                              on_message_callback=callback)    # 指定回调函数
    
        print(' [*] 等待消息. 按下 CTRL+C 退出')
        # 进入一个循环,等待消息
        channel.start_consuming()
    
    if __name__ == '__main__':
        try:
            main()
        except KeyboardInterrupt:
            print('Interrupted')
            try:
                sys.exit(0)
            except SystemExit:
                os._exit(0)
    

    注意:消费者和生产者都进行了同一个队列的创建,目的是无论哪一方先被启动,都能确保队列存在

  4. 启动消费者:

    python receive.py
    # [*] 等待消息. 按下 CTRL+C 退出
    # [x] 接收到 b'Hello World!'
    
  5. 启动生产者:

    python send.py
    # [x] 发送 'Hello World!'
    

3.3 任务队列(work queue)

任务队列(也叫工作队列)是存储了一些资源密集型任务的队列,这些任务被封装成一个个的消息。目的是为了避免资源密集型任务对当前进程的阻塞,在后台中让专门的任务处理进程(worker)从任务队列中取出任务(就是消息,这里叫任务更加贴切)去执行。而且,当有多个任务处理进程一起工作时,便于任务在它们之间共享

  1. 修改 send.py 中的代码,保存为 new_task.py ,要修改的部分如下:

    import sys
    
    # 要处理的任务
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message)    # 改为任务
    
  2. 修改 receive.py 中的代码,保存为 worker.py:

    import time
    
    def callback(ch, method, properties, body):
        print(" [x] 接收到 %r" % body.decode())
        # 消息体中有几个“.”就睡几秒钟,假装任务处理很耗时
        time.sleep(body.count(b'.'))
        print(" [x] 完成")
    

3.3.1 循环调度

使用任务队列的优点之一是能够轻松地扩大规模。如果我们的任务被大量积压,来不及处理,我们可以很轻松地增加更多的任务处理进程(以下都将简称为 worker)。

  1. 启动两个终端,运行2个 worker.py:

    python worker.py
    # [*] 等待消息. 按下 CTRL+C 退出
    
  2. 再启动一个终端,运行 new_task.py:

    python new_task.py First message.
    python new_task.py Second message..
    python new_task.py Third message...
    python new_task.py Fourth message....
    python new_task.py Fifth message.....
    
  3. 看看 worker 收到了什么:

    # 第一个 worker:
     [x] 接收到 'First message.'
     [x] 完成
     [x] 接收到 'Third message...'
     [x] 完成
     [x] 接收到 'Fifth message.....'
     [x] 完成
     
    # 第二个 worker:
     [x] 接收到 'Second message..'
     [x] 完成
     [x] 接收到 'Fourth message....'
     [x] 完成
    

    默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个 worker。一般来说,每个 worker 都会收到相同数量的任务。这种分发任务的方式称为轮询

3.3.2 消息确认

在上面的代码中,如果一个 worker 在执行任务的过程中被终止了,那么任务会丢失。为了避免任务丢失,RabbitMQ 提供了消息确认机制:

  • 如果 worker 回复一个 ack(acknowledgement),那么 RabbitMQ 就认为该任务被成功处理,然后从任务队列删除该任务
  • 如果一个 worker 没有回复 ack 就终止运行了,那么 RabbitMQ 就认为该任务执行失败,会将该任务重新排队,分配给其他运行中的 worker

消息确认有一个超时时间,默认为 30分钟。它能帮助我们检测异常的 worker。

默认情况下,手动消息确认是打开的。在前面的例子中,我们通过auto_ack=True关闭了它们。

  1. 继续修改代码,移除关闭消息确认的参数:

    def callback(ch, method, properties, body):
        print(" [x] 接收到 %r" % body.decode())
        time.sleep(body.count(b'.'))
        print(" [x] 完成")
        ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送 ack
    
    channel.basic_consume(queue='hello', on_message_callback=callback)
    

    使用这段代码后,即使你用 CTRL+C 终止一个正在处理任务的 worker,也不会有任何损失。

3.3.3 消息持久化

我们已经学会了如何确保即使 worker 意外终止,任务也不会丢失。但是如果RabbitMQ 服务器停止,我们的任务仍然会丢失。要确保任务不会丢失,需要做两件事:将队列和任务都标记为持久。

  1. 标记队列为持久,以便 RabbitMQ 重启后队列能够存活:

    # 因为 hello 队列已经存在,所以我们换了个新队列 task_queue
    channel.queue_declare(queue='task_queue', durable=True)
    

    注意!持久化标记要在生产者和消费者上都标记上。

  2. 标记任务为持久:

    channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
                          ))
    

将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接收到消息并且还没有保存消息时,仍然会有一个很短的空档。

此外,RabbitMQ 不会对每条消息做fsync(2)处理——它可能只是被保存在缓存中,而不是真正写入磁盘。

持久性保证不是强的,但对于我们的简单任务队列来说已经足够了。如果你需要一个更强的保证,那么你可以使用 publisher confirm

3.3.4 公平调度

RabbitMQ 默认均匀地分配任务,即使有的任务特别耗时,使得分配到该任务地 worker 任务已经堆积如山,RabbitMQ 还是在均匀分配。

这是因为 RabbitMQ 在任务进入队列时只进行分配,不查看 worker 未确认消息的数量,盲目地将第 n 个任务发送给第 n 个 worker。不过,我们可以通过设置,改变这种行为。

  1. 在 worker 中配置公平调度:

    channel.basic_qos(prefetch_count=1)
    

    上面地数字是1,也就是说在 worker 处理并确认前一个任务完成之前,不要向它分配新任务,而是将任务发送给下一个没有工作中的 worker。

如果所有的 worker 都很忙,任务队列就会排满。你需要注意这一点,并尽可能添加更多的工作人员,或者使用消息 TTL

3.4 发布/订阅(Publish/Subscribe)

发布者(即生产者)发送一条消息,每一个订阅者(即消费者)都能接收到,这种模式称为“发布/订阅”

3.4.1 交换器(Exchanges)

之前我们说:生产者发送的消息被保存到队列中。这只是为了好理解而这么说的。实际上,生产者从不直接向队列发送任何消息,而是发送给交换器(exchange)。由交换器决定消息下一步去往何处,具体的规则由交换器类型定义。

交换类型有 direct、topic、header 和 fanout。创建一个 fanout 类型的交换器,取名为logs

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

fanout 交换器十分简单,它只是单纯地把消息发送给它所知道的队列

将消息发送到创建好的交换器:

channel.basic_publish(exchange='logs',  # 通过名字指定交换器
                      routing_key='',
                      body=message)

3.4.2 临时队列

之前我们使用的都是命名的队列,像hello、task_queue。如果我们创建队列时不传入名称会怎样呢?答案是 RabbitMQ 会创建一个临时队列,该队列的名称会像这样:“mq.gen-JzTY20BRgKO-HjmUJj0wLg”。

我们还可以传入一个exclusive=True实现独占效果:当消费者关闭连接后,该队列也会随之消失

创建一个独占的临时队列:

result = channel.queue_declare(queue='', exclusive=True)

3.4.3 绑定

接收者中绑定交换器和队列,也就是订阅:

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

之后,logs 交换器将向我们的临时队列添加消息。只要接收者都绑定同一个交换器和队列,就都能收到消息。

我们还可以通过以下命令查看绑定信息(前提是接收者运行中):

sudo rabbitmqctl list_bindings

3.5 路由(Routing)

目前,我们的发布/订阅模型中,订阅者会毫无选择地接收发布者的所有消息。不过我们可以在绑定时,通过routing_key参数进行过滤,让订阅者选择想要的消息接收。

routing_key的含义取决于交换器的类型,之前使用的 fanout 类型会直接忽视这个参数。所以,需要使用其他类型的交换器。比如,Direct 交换器。

3.5.1 direct 交换器

它背后的算法很简单:

  1. 先看basic_publish()routing_key是否和queue_bind()routing_key匹配;
  2. 匹配就放入队列,不匹配就去匹配其他队列;
  3. 没有匹配的就丢弃消息。
# 发布消息
channel.basic_publish(exchange='direct_logs',
                      routing_key='black',
                      body=message)

# 绑定
channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

RabbitMQ 允许使用同一个routing_key绑定多个队列。交换器会将消息发送给多个匹配的队列。

3.5.2 topic 交换器

topic 交换器的routing_key必须是用.分割的多个词语,如orange.rabbit.lazy,最长为255个字节。发布时的routing_key为可以按照.分割。如果二者之间匹配,则将消息发送给队列,如果不匹配则丢弃。这和 direct 交换器是一样的。但topic 交换器真正的用处在于两个特殊符号:

  • *:代表匹配任意一个单词。比如,*.orange.*可以匹配到中间是orange的单词数量为3个的任意routing_key
  • #:代表匹配任意多个单词。比如,rabbit.#可以匹配到rabbit开头的任意routing_key,且单词数量在合法范围内没有限制。

以上是关于消息队列:RabbitMQ安装和快速入门的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ消息中间件快速入门

快速入门分布式消息队列之 RabbitMQ(中)

RabbitMQ 第二课 快速入门

RabbitMQ 第二课 快速入门

快速入门分布式消息队列之 RabbitMQ(下)

消息队列RabbitMq入门