rabbitMQ

Posted amyleell

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMQ相关的知识,希望对你有一定的参考价值。

rabbitMQ介绍及基本使用

官方文档看这里:

http://www.rabbitmq.com/getstarted.html

一、队列的介绍

1、什么是rabbitMQ?

   RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

   本质:socket之间的通信。

2、其他的消息队列?

  queue:内存中的,一般用于测试,将列表放入内存中。

  redis列表:非专业队列,可以做队列。

  rabbitMQ:专业队列,消息队列。可以做持久化,可以保证数据的安全。

  zeroMQ:基于内存的队列,比rabbitMQ更快,因为是 将数据放到内存。

 3、为什么要有消息队列?

  1 处理生产者消费者不对等的关系。

    可以根据生产者的多少(用户请求的多少),消费者动态的增加或减少。

  2 进行数据通信:

    - restfull  api 传送的通过http协议发送的json格式数据

    - webservice传送的通过http协议发送的xml格式数据(在rest api诞生之前做的web网站之间的进行数据交互是通过webservice做的,c#,java居多)

    - rpc(远程服务调用),基于socket并使用自己封装的协议进行数据传输,做数据交互。

      ‘处理者’在消息队列的一端等候任务到来,一旦有人发送请求就会被立即接收,并做处理。在消息队列旁再临时创建一个队列,发送请求之后就在这个队列一端进行等候,并通知处理者将消息处理完之后就放到这个队列中。任务处理完之后处理者将结果放到队列中,等发送者拿到消息之后就删除临时创建的队列。一般在公司内部做交互,通过rpc来进行通信,各个公司使用的协议不一定相同。升级版:给待处理信息写一个函数名,并传参数,让服务端找到这个函数,通过反射执行,获取结果,然后将结果放到队列中。

二、rabbit MQ的安装

服务端安装:

  

安装配置epel源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
安装erlang
   $ yum -y install erlang
 
安装RabbitMQ
   $ yum -y install rabbitmq-server

   运行:

    方式1:rabbitmq-server (hang住) ctrl+c停止

    方式2:systemctl start rabbitmq-server(在后台进程运行)

    默认无密码,如果有密码:

    1 sudo rabbitmqctl add_user wupeiqi 123
    # 设置用户为administrator角色
    2 sudo rabbitmqctl set_user_tags peiqi administrator
    # 设置权限
    3 sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"

    #重启:

    4 systemctl restart rabbitmq-server

  技术分享图片

客户端:

  pip3 install pika 

 

三、使用

基于Queue实现生产者消费者模型

技术分享图片
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
queue

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

技术分享图片

技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

# 创建一个队列:s91
channel.queue_declare(queue=s91)


# 向队列s91中发送一个 Hello World!
channel.basic_publish(exchange=‘‘,routing_key=s91,body=66)

connection.close()
生产者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()
channel.queue_declare(queue=s91)

def callback(ch, method, properties, body):
    print(body)


channel.basic_consume(callback,queue=s91,no_ack=True)

channel.start_consuming()
消费者 获取队列中的值
################生产者#########
import pika
#无密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

#创建队列
channel.queue_declare(queue=‘hello‘)

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()

###############消费者#################
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()


channel.queue_declare(queue=‘hello‘)
#进行回调
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()

  

1、acknowledgment 消息不丢失

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

# 创建一个队列:s91
channel.queue_declare(queue=s91)


# 向队列s91中发送一个 Hello World!
channel.basic_publish(exchange=‘‘,routing_key=s91,body=66)

connection.close()
生产者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))
channel = connection.channel()

# channel.queue_declare(queue=‘s91‘)

def callback(ch, method, properties, body):
    print(body)
    ## 进行确认
    ch.basic_ack(delivery_tag=method.delivery_tag)   

channel.basic_consume(callback,queue=s91,no_ack=False)

channel.start_consuming()
消费者 进行确认

 2、durable   消息不丢失(服务端持久化)

  就算服务端或客户端挂掉,也没有关系,RabbitMQ会重新将该任务添加到队列中。

技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

# make message persistent
channel.queue_declare(queue=s92, durable=True)

channel.basic_publish(exchange=‘‘,
                      routing_key=s92,
                      body=Hello World!,
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
connection.close()
生产者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

# make message persistent
channel.queue_declare(queue=s92, durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue=s92,no_ack=False)


channel.start_consuming()
消费者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

# make message persistent
channel.queue_declare(queue=s92, durable=True)

channel.basic_publish(exchange=‘‘,
                      routing_key=s92,
                      body=Hello World!,
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
connection.close()
生产者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

# make message persistent
channel.queue_declare(queue=s92, durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=s92,no_ack=False)


channel.start_consuming()
消费者

4、发布订阅

技术分享图片

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

可有多个订阅者

 exchange type = fanout

技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange=e1,exchange_type=fanout)

message = "Hello World!"

channel.basic_publish(exchange=e1,routing_key=‘‘,body=message)

connection.close()
发布者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange=e1,exchange_type=fanout)

# 随机生成对列 名
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 让队列和e1绑定
channel.queue_bind(exchange=e1,queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()
订阅者

5、关键字发送

技术分享图片

 exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange=e2,exchange_type=direct)

message = "Hello World!"

channel.basic_publish(exchange=e2,routing_key=error,body=message)

connection.close()
生产者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange=e2,exchange_type=direct)

# 随机生成对列名
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 让队列和e1绑定
channel.queue_bind(exchange=e2,queue=queue_name,routing_key=info)
channel.queue_bind(exchange=e2,queue=queue_name,routing_key=error)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()
消费者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange=e2,exchange_type=direct)

# 随机生成对列名
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 让队列和e1绑定
channel.queue_bind(exchange=e2,queue=queue_name,routing_key=error)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()
消费者2

6、模糊匹配

技术分享图片

 exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
1
2
3
发送者路由值              队列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange=e3,exchange_type=topic)

message = "Hello World!"

channel.basic_publish(exchange=e3,routing_key=info.xx.uu,body=message)

connection.close()
生产者
技术分享图片
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.13.92,credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange=e3,exchange_type=topic)

# 随机生成对列名
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 让队列和e1绑定
channel.queue_bind(exchange=e3,queue=queue_name,routing_key=info.*)



def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()
消费者 可有多个

注意:

技术分享图片
sudo rabbitmqctl add_user wupeiqi 123
# 设置用户为administrator角色
sudo rabbitmqctl set_user_tags wupeiqi administrator
# 设置权限
sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"

# 然后重启rabbiMQ服务
sudo /etc/init.d/rabbitmq-server restart
 
# 然后可以使用刚才的用户远程连接rabbitmq server了。


------------------------------
credentials = pika.PlainCredentials("wupeiqi","123")

connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.14.47,credentials=credentials))
复制代码
设置链接密码
技术分享图片
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
from pika.adapters.blocking_connection import BlockingChannel

credentials = pika.PlainCredentials("root", "123")

conn = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.20, credentials=credentials))
# 超时时间
conn.add_timeout(5, lambda: channel.stop_consuming())

channel = conn.channel()

channel.queue_declare(queue=hello)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    channel.stop_consuming()


channel.basic_consume(callback,
                      queue=hello,
                      no_ack=True)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()
设置超时时间

终极归纳:

使用:
a. 普通消息队列

b. 批量向多个队列中发送(订阅者发布者)

c. 根据关键字匹配向队列中发送

d. 模糊匹配向队列中发送
1. exchange的作用?
- exchange和队列进行绑定
- 用户向队列发送数据时,无序再找队列,直接向exchange中发送即可。

2. rabbitmq中有几种exchange?
- fanout,只要绑定就发
- dirct,确定关键字
- topic,模糊匹配

3. 消息持久化和ack
- 服务端(durable)
- 客户端(ack)

4. 超时

5. 消息顺序

6. 其他消息队列?
- queue
- redis列表
- kafka
- zeromq

7. 什么时候用过消息队列?
- 防止消息堆积(消息提醒)
- 订单处理(celery+消息队列)

 

 

 

 

 

 

 

 

 





































以上是关于rabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

带着新人学springboot的应用07(springboot+RabbitMQ 下)

RabbitMQ入门:Hello RabbitMQ 代码实例

rabbitmq演示代码

SpringBoot RabbitMQ 延迟队列代码实现

RabbitMQ代码第一步

RabbitMQ代码第一步