python RabbitMQ队列使用(入门篇)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python RabbitMQ队列使用(入门篇)相关的知识,希望对你有一定的参考价值。

---恢复内容开始---

python RabbitMQ队列使用

关于python的queue介绍

关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下的线程间或者父进程与子进程之间进行队列通讯,并不能进行程序与程序之间的信息交换,这时候我们就需要一个中间件,来实现程序之间的通讯。

RabbitMQ

MQ并不是python内置的模块,而是一个需要你额外安装(ubunto可直接apt-get其余请自行百度。)的程序,安装完毕后可通过python中内置的pika模块来调用MQ发送或接收队列请求。接下来我们就看几种python调用MQ的模式(作者自定义中文形象的模式名称)与方法。

 RabbitMQ设置远程链接账号密码

启动rabbitmq web服务:
2.远程访问rabbitmq:自己增加一个用户,步骤如下:
l1. 创建一个admin用户:sudo rabbitmqctl add_user admin 123123
l2. 设置该用户为administrator角色:sudo rabbitmqctl set_user_tags admin administrator
l3. 设置权限:sudo rabbitmqctl set_permissions -p ‘/‘ admin ‘.‘ ‘.‘ ‘.‘
l4. 重启rabbitmq服务:sudo service rabbitmq-server restart
之后就能用admin用户远程连接rabbitmq server了。

轮询消费模式

此模式下,发送队列的一方把消息存入mq的指定队列后,若有消费者端联入相应队列,即会获取到消息,并且队列中的消息会被消费掉。

若有多个消费端同时连接着队列,则会已轮询的方式将队列中的消息消费掉。

接下来是代码实例:

producer生产者

# !/usr/bin/env python
import pika
credentials = pika.PlainCredentials(admin,123456)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    192.168.56.19,5672,/,credentials))
channel = connection.channel()

# 声明queue
channel.queue_declare(queue=balance)

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
                      routing_key=balance,
                      body=Hello World!)
print(" [x] Sent ‘Hello World!‘")
connection.close()

发送过队列后,可在MQ服务器中查看队列状态

[[email protected] ~]# rabbitmqctl list_queues
Listing queues ...
hello    1

consumer消费者

# _*_coding:utf-8_*_
__author__ = Alex Li
import pika

credentials = pika.PlainCredentials(admin,123456)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    192.168.56.19,5672,/,credentials))
channel = connection.channel()

# You may ask why we declare the queue again ? we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue=balance)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(callback,
                      queue=balance,
                      no_ack=True)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()

接收队列后,查看一下队列状态

[[email protected] ~]#  rabbitmqctl list_queues
Listing queues ...
hello    0

 

队列持久化

当rabbitMQ意外宕机时,可能会有持久化保存队列的需求(队列中的消息不消失)。

producer

# Cheng
# !/usr/bin/env python
import pika

credentials = pika.PlainCredentials(admin,123456)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    192.168.56.19,5672,/,credentials))
channel = connection.channel()

# 声明queue
channel.queue_declare(queue=durable,durable=True)

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
                      routing_key=durable,
                      body=Hello cheng!,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent ‘Hello cheng!‘")
connection.close()

执行后查看队列,记下队列名字与队列中所含消息的数量

[[email protected] ~]# rabbitmqctl list_queues
Listing queues ...
durable    1
#重启rabbitmq
[[email protected]
~]# systemctl restart rabbitmq-server
#重启完毕后再次查看 [[email protected]
~]# rabbitmqctl list_queues Listing queues ... durable #队列以及消息并未消失

执行消费者代码

cunsumer

# Cheng
# _*_coding:utf-8_*_
__author__ = Alex Li
import pika

credentials = pika.PlainCredentials(admin,123456)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    192.168.56.19,5672,/,credentials))
channel = connection.channel()

# You may ask why we declare the queue again ? we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But were not yet sure which program to run first. In such cases its a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue=durable,durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue=durable,
                      #no_ack=True
                      )

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()

可正确接收到信息。

再次查看队列的情况。

[[email protected] ~]# rabbitmqctl list_queues
Listing queues ...
durable    0

广播模式

当producer发送消息到队列后,所有的consumer都会收到消息,需要注意的是,此模式下producer与concerned之间的关系类似与广播电台与收音机,如果广播后收音机没有接受到,那么消息就会丢失。

建议先执行concerned

concerned

# _*_coding:utf-8_*_
__author__ = Alex Li
import pika

credentials = pika.PlainCredentials(admin,123456)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    192.168.56.19,5672,/,credentials))
channel = connection.channel()

channel.exchange_declare(exchange=Clogs,
                         type=fanout)

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange=Clogs,
                   queue=queue_name)

print( [*] Waiting for logs. To exit press CTRL+C)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

producer

import pika
import sys

credentials = pika.PlainCredentials(admin,123456)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    192.168.56.19,5672,/,credentials))
channel = connection.channel()

channel.exchange_declare(exchange=Clogs,
                         type=fanout)

message =  .join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=Clogs,
                      routing_key=‘‘,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

 

初级的几种使用rabbitMQ的方式就介绍到这里,下篇博客介绍高级用法。














以上是关于python RabbitMQ队列使用(入门篇)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ入门到精通(入门篇)

RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)

RabbitMQ 超详细入门篇

[RabbitMQ+Python入门经典] 兔子和兔子窝

消息队列RabbitMq入门

RabbitMQ入门教程(十七):消息队列的应用场景和常见的消息队列之间的比较