rabbitmq的简单介绍

Posted bai_nian_min_guo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq的简单介绍相关的知识,希望对你有一定的参考价值。

python操作rabbitmq的模块叫做pika

1个生产者对应1个消费者

如果生产者发送一条消息,那么消费者也只能接受到1条消息
如果生产者发送两条消息,那么消费者就可以接受到2条消息

1个生产者对应2个消费者
如果生产者发送1条消息,那么只有1个消费者能接受到消息
如果生产者发送2条消息,那么每个消费者都接受到1条消息



如果生产者先启动了,生产者的代码已经执行完成,程序退出,这个时候消费者才启动,消费者也能接受到消息



work queue
如果一个生产者对应多个消费者,那么生产者会依次把消息轮训到多个消费者上,实现一个负载均衡的效果



但是如果rabbitmq server重启后,则队列和消息就会丢失
1、队列持久化
durable=True
2、消息持久化
delivery_mode = 2


如果申明一个queue为持久化,那么就需要在服务端和客户端都需要设置

如果申明一个消息的持久化,加一个
delivery_mode = 2


消息公平分发
确保每个消费者同时只能有固定的数量的任务在处理,比如这个固定的任务是1或者2或者3之类的
这个需要在消费者端设置,设置我这个消费者同时只能处理几个任务
设置方法如下,这个1就是同时只能处理一个任务
channel.basic_qos(prefetch_count=1)



发布订阅
前面的一条消息只能被一个客户端消费,那么发布和订阅就是一条消息可以被
多个客户端消息,这里就需要用到“Exchange”,Exchange在定义的时候是有
类型的,以决定到底哪些queue符合条件,可以接受消息,一个exchange可以绑
定多个queue
fanout:所有绑定在这个exchange上的que都会接受到消息

direct:通过routingKey和exchange决定的哪个唯一的queue可以接收消息
可以根据关键字发送,即:队列绑定关键字,发送者将根据关键字发送消息
到指定的队列中




topic:所有符合routingKey(此时可以是一个表达式)的routingKey所绑定
的queue都可以接收到消息
表达式符合说明:#号代表一个或多个字符,*代表人和字符
例如
#a.a会匹配a.a,aa.a,aaa.a
*.a会匹配a.a,b.a,c.a
headers:通过headers来决定把消息发给哪些queue


1、先看下使用rabbitmq实现最简单的通信
先看生产端的代码
import pika
#操作rabbitmq的模块,官方的模块叫做pika


# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列


connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq


test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="hello2")
#在管道中申明一个队列,队列的名字就叫做hello

# test_channel.queue_declare(queue="hello",durable=True)
#设置hello这个queue为持久化,这个持久化是重启rabbitmq server的服务器重启,这个queue也不会丢失,需要在客户端和服务端都需要做持久化durable=True




#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化


test_channel.basic_publish(exchange="",
                           routing_key="hello2",
                           body="hello,my first python rabbitmq message 2")

# test_channel.basic_publish(exchange="",
#                            routing_key="hello",
#                            body="hello,my first python rabbitmq message 2")

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

 

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的


channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="hello2")
#在管道申明一个队列,在服务端已经申明了一个管道,名称为hello,这里为什么又要创建一个管道
#这里主要为了避免代码报错,如果客户端先执行,发现没有hello这个队列,则会报错,所以这里申
# 明一个队列,如果没有这个管道,则创建一个管道,如果有,就什么也不做,直接忽略了。

# def callback(body):
def callback(ch,method,properties,body):
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)


#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数


channel.basic_consume(callback,
                      queue="hello2",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

 

2、在看如何实现序列化持久化

先看生产端的代码

import pika
#操作rabbitmq的模块,官方的模块叫做pika


# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列


connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq


test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列


#这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。







#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化


test_channel.basic_publish(exchange="",
                           routing_key="test_duralbe_queue",
                           body="hello,my first python rabbitmq message 2")

# test_channel.basic_publish(exchange="",
#                            routing_key="hello",
#                            body="hello,my first python rabbitmq message 2")

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

 

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的


channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

# def callback(body):
def callback(ch,method,properties,body):
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)


#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数


channel.basic_consume(callback,
                      queue="test_duralbe_queue",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

3、在看如何实现消息持久化

先看生产端的代码

import pika
#操作rabbitmq的模块,官方的模块叫做pika


# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列


connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq


test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列


#这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。







#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化


test_channel.basic_publish(exchange="",
                           routing_key="test_duralbe_queue",
                           body="hello,my first python rabbitmq message 3",
                           properties=pika.BasicProperties(delivery_mode=2))

#properties=pika.BasicProperties(delivery_mode=2这个意思消息持久化,如果先执行生产者,执行完成后,生产者的代码退出,rabbitmq server重启后,
#直接启动消费者,消费者还可以接收到生产者发的消息,这个就是消息持久化

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

 

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的


channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

# def callback(body):
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)


#如果要实现消息持久化,需要在消费者端加这么一个行
# ch.basic_ack(delivery_tag=method.delivery_tag)

#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数


channel.basic_consume(callback,
                      queue="test_duralbe_queue",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

 

4、再看如何用rabbitmaq实现

先看生产端的代码

import pika
#操作rabbitmq的模块,官方的模块叫做pika


# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列


connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq


test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列


#这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。







#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化


test_channel.basic_publish(exchange="",
                           routing_key="test_duralbe_queue",
                           body="hello,my first python rabbitmq message 3",
                           properties=pika.BasicProperties(delivery_mode=2))

#properties=pika.BasicProperties(delivery_mode=2这个意思消息持久化,如果先执行生产者,执行完成后,生产者的代码退出,rabbitmq server重启后,
#直接启动消费者,消费者还可以接收到生产者发的消息,这个就是消息持久化

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

  

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的


channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

# def callback(body):
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)

channel.basic_qos(prefetch_count=1)

#如果要实现消息持久化,需要在消费者端加这么一个行
# ch.basic_ack(delivery_tag=method.delivery_tag)

#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数


channel.basic_consume(callback,
                      queue="test_duralbe_queue",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

5、在看如何使用rabbitmq实现全部广播

先看生产端的代码

import pika
import sys


# 如果是fanout,则不需要申明queue

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的


channel = client_connection.channel()

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

message = "".join(sys.argv[1:]) or "info:hello world!"

channel.basic_publish(exchange="logs",
                      routing_key="",
                      body=message)

print("[x] send %r" % message)

client_connection.close()

  

再看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的
channel = client_connection.channel()

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")


result = channel.queue_declare(exclusive=True)
#不指定queue的名字,rabbitmq自动随机分配一个名字,并且在这个queue会在消费者
#断开后自动被删除

queue_name = result.method.queue

channel.queue_bind(exchange="logs",
                   queue=queue_name)
#把queue绑定到exchange上,他才能接受消息



print("waiting for message")
def callback(ch,method,properties,body):
    print("receive message %r" %body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

 

 

6、在看使用rabbitmap实现指定的广播

先看发送端的代码

import pika
import sys


#这个的意思是,把队列绑定在一个管道中,不同的客户端把这个队列申明为不同的key,在生产端
#可以指定这个消息发送到队列中的哪些key中



#根据关键字把消息发送到指定的队列中

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的


channel = client_connection.channel()
#创建一个管道


channel.exchange_declare(exchange="direct_logs",
                         exchange_type="direct")

#申明管道的类型

severity = sys.argv[1] if len(sys.argv) > 1 else "info"
# 指定往哪个队列中发,如果没有指定,则往info里发




message = "".join(sys.argv[1:]) or "hello world"
channel.basic_publish(exchange="direct_logs",
                      routing_key=severity,
                      #往队列中的那个routing_key中发
                      body=message)

print("[x] send %r" % message)

client_connection.close()

  

 

在看接收端的代码

import pika
import sys


client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的
channel = client_connection.channel()

channel.exchange_declare(exchange="direct_logs",
                         exchange_type="direct")


result = channel.queue_declare(exclusive=True)
#不指定queue的名字,rabbitmq自动随机分配一个名字,并且在这个queue会在消费者
#断开后自动被删除

queue_name = result.method.queue


severities = sys.argv[1:]

if not severities:
    sys.stderr.write("Usage: %s [info] [warning ] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange="direct_logs",
                       queue=queue_name,
                       routing_key=severity)
#把队列申明为指定的路由key,客户端申明为路由key为1,那么只有服务端发送到队列中路由key为1
# 的客户端,才能收到消息



print("waiting for message")
def callback(ch,method,properties,body):
    print("receive message %r" %body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

  



以上是关于rabbitmq的简单介绍的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq简单介绍

初识RabbitMQ系列之一:简单介绍

RabbitMQ简单介绍

简单介绍下怎么在spring中使用RabbitMQ

深入了解RabbitMQ工作原理及简单使用

消息队列介绍RabbitMQ&Redis的重点介绍与简单应用