RabbitMQ基础知识

Posted jec1999

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ基础知识相关的知识,希望对你有一定的参考价值。

RabbitMQ

关键在于消息的发布与消费、消息的路由。

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key,可以视作Queue的name,
消费者将消息发送给Exchange时,一般会指定一个routing key
当binding key 与 routing key 相匹配时,消息就会被路由到对应的Queue中。

Exchange Types
fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
direct direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
topic 与direct类似,但是是模糊匹配,*”用于匹配一个单词,“#”用于匹配多个单词
binding key 类似 *.*.rabbit,routing key 为quick.orange.rabbit的消息会被路由到该Queue
headers headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。


对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

参考:http://www.diggerplus.org/archives/3110


开启RabbitMQ后台管理:
1.在rabbitMQ安装目录下的sbin目录,打开终端执行:rabbitmq-plugins.bat enable rabbitmq_management开启网页管理界面,然后重启rabbitMQ
2.浏览器中输入http://localhost:15672/
3.输入用户名和密码(默认为guest)

 

生产者

import pika

#########  生产者 #########
# 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
# 创建频道
channel = connection.channel()
# 创建一个队列名叫test
channel.queue_declare(queue=test)

# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式

while True:  # 循环向队列中发送信息,quit退出程序
    inp = input(">>>").strip()
    if inp == quit:
        break
    channel.basic_publish(exchange=‘‘,
                          routing_key=test,
                          body=inp)
    print("生产者向队列发送信息%s" % inp)

# 缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接
connection.close()

# 输出结果
# >> > python
# 生产者向队列发送信息python
# >> > quit

 

消费者

#!/usr/bin/env python 3
import pika

######### 消费者 #########
# 链接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
# 创建频道
channel = connection.channel()
# 如果生产者没有运行创建队列,那么消费者也许就找不到队列了。为了避免这个问题,所有消费者也创建这个队列,如果队列已经存在,则这条无效
channel.queue_declare(queue=test)


# 接收消息需要使用callback这个函数来接收,他会被pika库来调用,接受到的数据都是字节类型的
def callback(ch, method, properties, body):
    """
        ch : 代表 channel
        method :队列名
        properties : 连接rabbitmq时设置的属性
        body : 从队列中取到的内容,获取到的数据时字节类型
    """


    print(" [x] Received %r" % body)
# channel.basic_consume 表示从队列中取数据,如果拿到数据 那么将执行callback函数,callback是回调函数
# no_ack=True 表示消费完这个消息以后不主动把完成状态通知rabbitmq
channel.basic_consume(callback,
                      queue=test,
                      no_ack=True)
print( [*] 等待信息. To exit press CTRL+C)
# 永远循环等待数据处理和callback处理的数据,start_consuming方法会阻塞循环执行
channel.start_consuming()

# 输出结果,一直等待处理队列中的消息,不知终止,除非人为ctrl+c
#  [*]等待消息,To exit press CTRL+C
#  [x] Received b‘python‘

 

消费者acknowledgement消息不丢失的方法

# no_ack = False , 如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。在消费者端做设定条件。
# 生产者,代码同上,未改变
# 消费者代码


import pika
import time

# 链接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
# 创建频道
channel = connection.channel()
# 如果生产者没有运行创建队列,那么消费者创建队列,如果队列已存在,创建队列操作会被忽略
channel.queue_declare(queue=test)


# 回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    print(ok)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 当上面消息处理完成后,通知rabbitmq,消息处理完成,不要在发送了


channel.basic_consume(callback,
                      queue=test,
                      no_ack=False)  # 表示消费完这个消息后,主动通知rabbitmq完成状态,如果不通知,rabbitmq会把这条消息重新放回队列中,避免丢失

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()

 














以上是关于RabbitMQ基础知识的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例

android小知识点代码片段

[vscode]--HTML代码片段(基础版,reactvuejquery)

RabbitMQ-从基础到实战— Hello RabbitMQ

[Go] 通过 17 个简短代码片段,切底弄懂 channel 基础

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段