Rabbitmq

Posted pyrene

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq相关的知识,希望对你有一定的参考价值。

一、简介

解释RabbitMQ,就不得不提到AMQP(Advanced Message Queuing Protocol)协议。 AMQP协议是一种基于网络的消息传输协议,它能够在应用或组织之间提供可靠的消息传输。RabbitMQ是该AMQP协议的一种实现,利用它,可以将消息安全可靠的从发 送方传输到接收方。简单的说,就是消息发送方利用RabbitMQ将信息安全的传递给接收方。

     RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

下面是rabbitmq 图示

二、安装

rabbitmq for windows

1 erlang:http://www.erlang.org/download/otp_win64_17.3.exe 2 3

下载地址:http://www.rabbitmq.com/download.html 

安装API

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6  
7 https://pypi.python.org/pypi/pika

 

rabbitmq for linux

1 安装配置epel源
2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3  
4 安装erlang
5    $ yum -y install erlang
6  
7 安装RabbitMQ
8    $ yum -y install rabbitmq-server

 

安装API

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6  
7 https://pypi.python.org/pypi/pika

 

for mac:

http://my.oschina.net/u/998693/blog/547873

三、简单队列模型

基于Queue实现生产者消费者模型
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
View Code

 

  对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

基于rabbitmq的简单队列模型:在于交换机不工作的情况下

#/usr/bin/env python

import pika

# ######################### 生产者 #########################
#封装socket逻辑部分
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
#拿到操作句柄
channel = connection.channel()
#创建队列,并且给队列取一个名字
channel.queue_declare(queue=\'hello\')
#这里面三个参数,第一个是交换机,第二个参数指定队列,第三个参数是传递的数据
#一个队列的时候exchange交换机设置为空,routing_key指定把body(内容)放到那个队列里面
channel.basic_publish(exchange=\'\',
                      routing_key=\'hello\',
                      body=\'Hello World!\')
print(" [x] Sent \'Hello World!\'")
connection.close()
View Code

 

channel.queue_declare(queue=\'hello\')  这里创建队列的时候可能会出现durable为true的提示。

durable=Ture:就是创建持续化的队列

durable=False: 是创建非持续化的队列

如果创建channel.queue_declare(queue=\'hello\'durable=Ture)就是创建了持续化队列。如果创建了持续化队列,就不要创建非持续化的队列

 

生产者:

首先导入pika模块

1、  封装socket逻辑部分

2、  拿到操作句柄,创建队列并且给这个队列取一个名字

3、  进行操作把生产的内容放入到队列中,这里有三个参数

a)         第一个残害苏是交换机,如果只有一个队列交换机默认为空,

b)         第三个参数是需要传递的内容

c)         第二个队列是设置吧内容传递给那个队列

 1 #/usr/bin/env python
 2 
 3 import pika
 4 
 5 # ########################## 消费者 ##########################
 6 #封装socket逻辑部分
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(
 8         host=\'localhost\'))
 9 #生成句柄
10 channel = connection.channel()
11 #创建队列,内部做了判断,如果没有这个队列会创建,如果已经有了队列就不会创建
12 channel.queue_declare(queue=\'hello\')
13 
14 def callback(ch, method, properties, body):
15     print(" [x] Received %r" % body)
16 
17 #回调函数,首先去指定队列中获取数据,放到body中,然后执行回调函数
18 #no_ack:等于True是无应答,如果为false就是有应答
19 #有应答是用户如果连接的时候突然断了,这个时候在队列中这个数据还保存,用户连接的时候还有
20 #这个队列会一直等这个用户。这个是比较安全的,但是效率不会太高
21 channel.basic_consume(callback,
22                       queue=\'hello\',
23                       no_ack=True)
24 
25 print(\' [*] Waiting for messages. To exit press CTRL+C\')
26 channel.start_consuming()
消费者

1、  首先导入pika模块

2、  封装socket逻辑部分,并且声称句柄

3、  创建队列(内部会做一个判断,如果这个队列没有那么就会创建这个队列,如果有就不会创建)

4、  执行回调函数来获取内容

a)         第一个参数是回调函数

b)         第二个参数指定队列,把获取的内容放入到回调函数中

c)         第三个参数应答参数用来设置是否有应答(消息丢不丢失)

四、重要参数

1、  acknowledgment消息丢不丢失(这个参数是用户如果崩溃异常消息丢不丢失)

要设置下面两部分内容:首先设置no_ack=False,然后设置下面黄色部分

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

        host=\'10.211.55.4\'))

channel = connection.channel()

 

channel.queue_declare(queue=\'hello\')

 

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print \'ok\'

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_consume(callback,

                      queue=\'hello\',

                      no_ack=False)

 

print(\' [*] Waiting for messages. To exit press CTRL+C\')

channel.start_consuming()

第二个参数:

durable 消息不丢失:这个是rabbitmq如果崩溃异常,数据会不会丢失

1、  在生产者中把下面黄色部分设置 :这样重启的时候数据会恢复原因是吧数据从内存放入到了磁盘

a)         创建队列的时候durable=True

b)         把数据放入到队列中的时候把delivery_mode=2

#!/usr/bin/env python

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'10.211.55.4\'))

channel = connection.channel()

 

# make message persistent

channel.queue_declare(queue=\'hello\', durable=True)

 

channel.basic_publish(exchange=\'\',

                      routing_key=\'hello\',

                      body=\'Hello World!\',

                      properties=pika.BasicProperties(

                          delivery_mode=2, # make message persistent

                      ))

print(" [x] Sent \'Hello World!\'")

connection.close()

消费者

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'10.211.55.4\'))

channel = connection.channel()

 

# make message persistent

channel.queue_declare(queue=\'hello\', durable=True)

 

 

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print \'ok\'

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_consume(callback,

                      queue=\'hello\',

                      no_ack=False)

 

print(\' [*] Waiting for messages. To exit press CTRL+C\')

channel.start_consuming()

消费获取顺序

在rabbitmq中默认的消息队列中的数据是按照顺序被消费者拿走的。假如有三个消费者,默认按照顺序在队列中获取数据,但是由于机器配置等原因,有些会拿的慢,有些拿的快,默认情况下,拿的快的机器会等待拿的慢的机器获取完毕后才去获取新数据,这样就造成了机器闲置状态。如果想打破这种默认模式,而是谁获取完毕之后谁就去获取新数据,这样效率就会提高。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照顺序排列获取数据

这条数据要放到消费者里面。如下:黄色部分

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host=\'10.211.55.4\'))

channel = connection.channel()

 

# make message persistent

channel.queue_declare(queue=\'hello\')

 

 

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print \'ok\'

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

 

channel.basic_consume(callback,

                      queue=\'hello\',

                      no_ack=False)

 

print(\' [*] Waiting for messages. To exit press CTRL+C\')

channel.start_consuming()

五、exchange工作模型(fanout,direct,topic)

1、 发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中

exchange type = fanout

 

发布者:让数据发送到交换机中,然后由交换机决定让数据放入到那个队列中

发布者

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
#exchange=”logs”表示给交换机创建一个名字,创建了一个交换机,
channel.exchange_declare(exchange=\'logs\',
                         type=\'fanout\')
#把数据
message = \' \'.join(sys.argv[1:]) or "info: Hello World!"
#把数据放入到交换机中,这里不需要指定队列,所以routing_key为空,第三个参数是吧数据放入message
channel.basic_publish(exchange=\'logs\',
                      routing_key=\'\',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
订阅者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
#1、创建交换机
channel.exchange_declare(exchange=\'logs\',
                         type=\'fanout\')
#2、随机创建一个队列
result = channel.queue_declare(exclusive=True)
#3、给创建的队列随机命名
queue_name = result.method.queue

#4、创建的队列进行绑定交换机(拿到队列的名字,交换机的名字进行绑定)
channel.queue_bind(exchange=\'logs\',
                   queue=queue_name)

print(\' [*] Waiting for logs. To exit press CTRL+C\')
#7、获取了数据然后执行
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
#6、这里获取生产者的数据,然后执行callback函数
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

#5、这里进行一个阻塞,直到生产者放出一个数据的时候就会执行上面的方法
channel.start_consuming()
View Code

 

生产者启动的时候会创建交换机,然后把数据放入到交换机中,然后消费者启动的时候首先创建队列,并且检查有没有交换机,如果有交换机就进行绑定,如果没有就需要自己创建一个交换机,然后进行阻塞等待生产者把数据放入到交换机中,如果有数据,那么消费者就会从交换机中获取数据通过队列获取

---------------------------------------------------------------------------------------------------------------------

2关键字发送

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

exchange type = direct

详细解释:生产者创建交换器,消费者创建队列绑定队列并且绑定交换器。这个时候队列还需要绑定关键字,如果绑定的有关键字,那么生产者发送消息到交换器,交换器根据哪个队列绑定的有关键字,然后把数据发送到那个队列,然后那个消费者绑定这个交换器,那么那个消费者就能获取到这个数据

注意:一个队列可以绑定多个字符串

 

消费者

绑定交换器和队列的时候绑定关键字

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'direct_logs\',
                         type=\'direct\')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
   #这里绑定的时候除了绑定交换机,和队列名字,还要绑定关键字,上面进行了for循环,所以这里是绑定了多个关键字
    channel.queue_bind(exchange=\'direct_logs\',
                       queue=queue_name,
                       routing_key=severity)
 
print(\' [*] Waiting for logs. To exit press CTRL+C\')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

生产者:

1、创建交换器,并且让参数为direct

2、设置一个或者多个关键字

3、让关键字放入到交换器中

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'direct_logs\',
                         type=\'direct\')
 
severity = sys.argv[1] if len(sys.argv) > 1 else \'info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(exchange=\'direct_logs\',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

--------------------------------------------------------------------------------------------------------------------------

3、模糊匹配

 

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • * 表示只能匹配 一个 单词

 

发送者路由值              队列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配

 

模糊匹配和关键词匹配除了参数不一样其他的都是一样的

生产者

1、  创建交换器,并且加入模糊匹配的参数

2、  把关键字放入到交换器

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'topic_logs\',
                         type=\'topic\')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else \'anonymous.info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(exchange=\'topic_logs\',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

消费者

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'topic_logs\',
                         type=\'topic\')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange=\'topic_logs\',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(\' [*] Waiting for logs. To exit press CTRL+C\')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

六、rabbitmq的应用

http://www.rabbitmq.com/getstarted.html  这是rabbitmq教程

 

处理大并发设计模式,用户访问并且操作web服务器的时候,web服务器只是把用户访问操作等数据 提交到队列中,并且同时在数据库中给数据库给这个请求标记状态。

根据队列的的属性,后台服务器按照顺序处理这些请求,处理之后在数据库中返回其状态,这样用户刷新访问数据库就能看到是否处理了用户的请求。这样可以减轻web服务的并发。例子(活动抢票)

队列的应用之rpc

通过rabbitmq实现远程调用模式

用户想要调用远程多个服务器,这个时候可以产生两个队列,其中A这个端口远程服务器一直连接等待用户输入内容,其中一个队列B通过主机端口等来给远程服务器发送指令,然后服务器接收到指令之后通过用户产生的另一个队列B给用户返回消息,如果用户不想收到消息,直接把创建的另一个队列B删除掉就可以了

 

以上是关于Rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

带着新人学springboot的应用07(springboot+RabbitMQ 下)

RabbitMQ入门:Hello RabbitMQ 代码实例

rabbitmq演示代码

SpringBoot RabbitMQ 延迟队列代码实现

RabbitMQ代码第一步

RabbitMQ代码第一步