python-RabbtiMQ消息队列

Posted _tengmu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python-RabbtiMQ消息队列相关的知识,希望对你有一定的参考价值。

1.RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

 

2.RabbitMQ能为你做些什么?

 

消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶.

或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。

RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。

 

3.RabbitMQ 安装使用

 

 

4.Python应用RabbitMQ

python操作RabbitMQ的模块有三种:pika,Celery,Haigha。
本文使用的是pika。
"""
RabbitMQ-生产者。
"""

import pika

"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)

"""声明一个管道"""
channel = connection.channel()

"""定义一个queue,定义queue名称,标识"""
channel.queue_declare(queue=hello)



"""定义queue中的消息内容"""
channel.basic_publish(exchange=‘‘,
                      routing_key=hello,
                      body=Hello World!)

print(" [x] Sent ‘Hello World!‘")
"""
RabbitMQ-消费者。
"""

import pika

"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)

"""声明一个管道"""
channel = connection.channel()

"""定义一个queue,定义queue名称,标识,与生产者队列中对应"""
channel.queue_declare(queue=hello)

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)

"""消费,接收消息..."""
channel.basic_consume(
    consumer_callback=callback, #   如果收到消息,则回调这个函数处理消息
    queue=hello, # queue_declare(queue=‘hello‘) 对应
    no_ack=True
)

"""
    消费者会一直监听这queue,如果队列中没有消息,则会卡在这里,等待消息队列中生成消息。
"""

print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()

 

5.RabbitMQ消息持久化

技术分享图片
import pika


queue_name = xiaoxi_

"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()
"""定义一个queue,定义queue名称,标识
    queue,durable 持久化
"""
channel.queue_declare(queue=queue_name)


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定义queue中的消息内容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=‘‘,
            routing_key=queue_name,
            body=input_value,
            properties=pika.BasicProperties(   # 消息持久化.....
                delivery_mode=2,
            )
        )
    continue
producer.py
技术分享图片
import pika,time
queue_name = xiaoxi_

"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()
"""定义一个queue,定义queue名称,标识"""
channel.queue_declare(queue=queue_name)

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    #time.sleep(5) # 模拟消费者丢失生产者发送的消息,生产者消息队列中的这一条消息则不会删除。
    print(rev messages-->,body)

    """手动向生产者确认收到消息"""
    #ch.basic_ack(delivery_tag=method.delivery_tag)

"""消费,接收消息..."""
channel.basic_consume(
    consumer_callback=callback, #   如果收到消息,则回调这个函数处理消息
    queue=queue_name,
    #no_ack=True  #接收到消息,主动向生产者确认已经接收到消息。
)

print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

 

6.RabbitMQ消息公平分发

技术分享图片
import pika


queue_name = xiaoxi_1

"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()
"""定义一个queue,定义queue名称,标识
    queue,durable 持久化
"""
channel.queue_declare(queue=queue_name)


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定义queue中的消息内容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=‘‘,
            routing_key=queue_name,
            body=input_value,
        )
    continue
producer.py
技术分享图片
import pika,time


queue_name = xiaoxi_1

"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()
"""定义一个queue,定义queue名称,标识
    queue,durable 持久化
"""
channel.queue_declare(queue=queue_name)


def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    """模拟处理消息快慢速度"""
    time.sleep(1)
    ch.basic_ack(delivery_tag=method.delivery_tag)


"""根据消费者处理消息的快慢公平分发消息"""
channel.basic_qos(prefetch_count=1)


"""消费,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,则回调这个函数处理消息
    queue=queue_name,
   # no_ack=True  #接收到消息,主动向生产者确认已经接收到消息。
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

 

7.RabbitMQ-广播模式。

    消息的发送模式类型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。
        2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
            表达式符号说明:#代表一个或多个字符,*代表任何字符
            例:#.a会匹配a.a,aa.a,aaa.a等
                  *.a会匹配a.a,b.a,c.a等
            注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
        4.headers: 通过headers 来决定把消息发给哪些queue (少用)

 

7.1 topic 广播模式。

技术分享图片
import pika


"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()


"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""
exchange_name = topic_messages1
routing_key = my_topic


"""定义exchage模式 direct广播模式"""
channel.exchange_declare(exchange=exchange_name,exchange_type=topic)
"""
    消息的发送模式类型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。
        2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
            表达式符号说明:#代表一个或多个字符,*代表任何字符
            例:#.a会匹配a.a,aa.a,aaa.a等
                  *.a会匹配a.a,b.a,c.a等
            注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
        4.headers: 通过headers 来决定把消息发给哪些queue (少用)
"""


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定义queue中的消息内容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=routing_key,
            body=input_value,
        )
    continue
producer.py
技术分享图片
import pika,time



"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()

"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""
exchange_name = topic_messages1
routing_key = my_topic

channel.exchange_declare(exchange=exchange_name,exchange_type=topic)


"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""
res = channel.queue_declare(exclusive=True)
queue_name = res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)


print(direct_key:{0}.format(routing_key))
print(queue_name:{0}.format(queue_name))

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


"""消费,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,则回调这个函数处理消息
    queue=queue_name,
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

 

7.2 direct 广播模式

技术分享图片
import pika


connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
channel = connection.channel()


"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""
exchange_name = direct_messages
routing_key = my_direct

"""
    定义exchage模式 direct广播模式
    消息的发送模式类型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。
        2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
            表达式符号说明:#代表一个或多个字符,*代表任何字符
            例:#.a会匹配a.a,aa.a,aaa.a等
                  *.a会匹配a.a,b.a,c.a等
            注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
        4.headers: 通过headers 来决定把消息发给哪些queue (少用)
"""
channel.exchange_declare(exchange=exchange_name,exchange_type=direct)

channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=hello word!,
)


# while True:
#     input_value = input(">>:").strip()
#     if input_value:
#         """定义queue中的消息内容"""
#         print(‘producer messages:{0}‘.format(input_value))
#         channel.basic_publish(
#             exchange=exchange_name,
#             routing_key=routing_key,
#             body=input_value,
#         )
#     continue
producer.py
技术分享图片
import pika,time



connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
channel = connection.channel()

exchange_name = direct_messages
routing_key = my_direct

channel.exchange_declare(exchange=exchange_name,exchange_type=direct)


"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""
res = channel.queue_declare(exclusive=True)
queue_name = res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)


print(direct_key:{0}.format(routing_key))
print(queue_name:{0}.format(queue_name))

def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


"""消费,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,则回调这个函数处理消息
    queue=queue_name,
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

 

7.3 fanout 广播模式

技术分享图片
import pika


"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()


exchange_name = messages
"""定义exchage模式 fanout广播模式"""
channel.exchange_declare(exchange=exchange_name,exchange_type=fanout)
"""
    消息的发送模式类型
        1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。
        2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。
        3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
            表达式符号说明:#代表一个或多个字符,*代表任何字符
            例:#.a会匹配a.a,aa.a,aaa.a等
                  *.a会匹配a.a,b.a,c.a等
            注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
        4.headers: 通过headers 来决定把消息发给哪些queue (少用)
"""


while True:
    input_value = input(">>:").strip()
    if input_value:
        """定义queue中的消息内容"""
        print(producer messages:{0}.format(input_value))
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=‘‘,
            body=input_value,
        )
    continue
producer.py
技术分享图片
import pika,time



"""声明socket"""
connection = pika.BlockingConnection(
    pika.ConnectionParameters(localhost)
)
"""声明一个管道"""
channel = connection.channel()

"""
    
"""
exchange_name = messages
channel.exchange_declare(exchange=exchange_name,exchange_type=fanout)

"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""
res = channel.queue_declare(exclusive=True)
queue_name = res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name)
"""每一个消费者随机一个唯一的queue_name"""
print(queue_name:{0},format(queue_name))


def callback(ch,method,properties,body):
    print(rev-->,ch,method,properties,body)
    print(rev messages-->,body)
    ch.basic_ack(delivery_tag=method.delivery_tag)



"""消费,接收消息..."""
channel.basic_consume(
    consumer_callback=callback,  # 如果收到消息,则回调这个函数处理消息
    queue=queue_name,
   # no_ack=True  #接收到消息,主动向生产者确认已经接收到消息。
)


print(waiting for meassages, to exit press CTRL+C)
channel.start_consuming()
consumer.py

 

8 RabbitMQ 实现 RPC

技术分享图片
"""
RabbitMQ-生产者。
利用rabbitMQ 实现一个能收能发的RPC小程序。
重点需要注意的是:queue的绑定。接收的一端必选预先绑定queue生成队列,发送端才能根据queue发送。
"""

import pika,uuid,time


class rabbitmqClient(object):

    def __init__(self,rpc_queue):
        self.rpc_queue = rpc_queue
        self.app_id = str(uuid.uuid4())

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
        self.channel = self.connection.channel()

        """生成一个自动queue,传过去server,server再往这个自动queue回复数据"""
        autoqueue = self.channel.queue_declare(exclusive=True)
        self.callback_queue = autoqueue.method.queue

        """先定义一个接收回复的动作"""
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)


    def on_response(self,ch,method,properties,body):
        if self.app_id == properties.app_id:
            self.response = body



    def send(self,msg):

        self.response = None
        self.channel.basic_publish(
            exchange=‘‘,
            routing_key=self.rpc_queue,
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                app_id=self.app_id,
            ),
            body=str(msg)
        )

        #   发送完消息,进入接收模式。
        while self.response is None:
            # print(‘callback_queue:{0} app_id:{1} wait...‘.format(self.callback_queue,self.app_id))
            self.connection.process_data_events()
            # time.sleep(0.5)
        return self.response



rpc_request_queue = rpc_request_queue

rb = rabbitmqClient(rpc_request_queue)

while True:
    msg = input(input >> :).strip()
    if msg:
        print(rpc_queue:{0} app_id:{1}.format(rb.rpc_queue,rb.app_id))
        print(send msg:{}.format(msg))
        reponses = rb.send(msg)
        print(reponses msg:{}.format(reponses.decode(utf-8)))
    continue
client.py
技术分享图片
"""
RabbitMQ-消费者。
"""

import pika

class rabbitmqServer(object):

    def __init__(self,rpc_queue):
        self.rpc_queue = rpc_queue

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
        self.channel = self.connection.channel()

        self.channel.queue_declare(queue=self.rpc_queue)


    def on_reponses(self,ch,method,properties,body):
        if body:
            # reponser ...
            ch.basic_publish(exchange=‘‘,
                             routing_key=properties.reply_to,
                             properties=pika.BasicProperties(
                                 reply_to=properties.reply_to,
                                 app_id=properties.app_id,
                             ),
                             body=reponses ok! msg is:{}.format(body.decode(utf-8)))


    def start_consuming(self):
        self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True)
        print(waiting for meassages, to exit press CTRL+C)
        self.channel.start_consuming()




rpc_request_queue = rpc_request_queue

rd_server = rabbitmqServer(rpc_request_queue)

rd_server.start_consuming()
server.py

 

以上是关于python-RabbtiMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq - 不会获取队列中的所有消息

Android UI 线程消息队列调度顺序

# Java 常用代码片段

# Java 常用代码片段

RocketMQ - 如何用死信队列解决消费者异常

消息队列为什么要使用消息队列消息队列优缺点??