RabbitMQ消息队列

Posted attila

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列相关的知识,希望对你有一定的参考价值。

RabbitMQ消息队列

  !!!  注意,保证服务器的内存足够,磁盘足够,以及删除/etc/hosts中没有用的dns解析

# 优点,能够保证消息数据持久化,不丢失,支持高并发

安装学习rabbitmq消息队列,配置好阿里云的yum源

1.yum -y install erlang  rabbitmq-server

2.启动rabbitmq服务端
systemctl start rabbitmq-server

3.开启rabbitmq的web控制台
rabbitmq-plugins enable rabbitmq_management

4.重启后生效web界面
systemctl restart rabbitmq-server
# 查看是否生效,出现登录页面就成功了
http://192.168.81.130:15672/    # 端口必须是这个

5.创建rabbitmq用户
sudo rabbitmqctl add_user attila 666
?
6.设置用户的权限,为admin管理员权限
sudo rabbitmqctl set_user_tags attila administrator

7.允许attila这个用户对所有的队列进行读写
sudo rabbitmqctl set_permissions -p "/" attila ".*" ".*" ".*"

8.可以用attila登录 rabbitmq的后台界面管理了

  no-ack机制

    不确定机制,就是每次消费者接收完数据后,不管是否处理完毕,rabbitmq-server都会把这个消息标记完成,从队列中删除

# 建一个py文件,叫做生产者.py
#!/usr/bin/env python
import pika
# 创建凭证,使用rabbitmq用户密码登录
# 去邮局取邮件,必须得验证身份
credentials = pika.PlainCredentials("attila","666")
# 新建连接,这里localhost可以更换为服务器ip
# 找到这个邮局,等于连接上服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.81.130,credentials=credentials))
# 创建频道
# 建造一个大邮箱,隶属于这家邮局的邮箱,就是个连接
channel = connection.channel()
# 声明一个队列,用于接收消息,队列名字叫“笑傲江湖”
channel.queue_declare(queue=笑傲江湖)
# 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=‘‘),
  它允许我们精确的指定发送给哪个队列(routing_key=‘‘),参数body值发送的数据
channel.basic_publish(exchange=‘‘,
                      routing_key=笑傲江湖,
                      body=独孤九剑潇洒非凡)
print("已经发送了消息")
# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
connection.close()
# 启动py文件,查看浏览器上的RabbitMQ, http://192.168.81.130:15672/  必须是这个端口  此时,浏览器上应该有一条数据了

  生产者已经有了,这时候准备一个消费者

import pika
# 建立与rabbitmq的连接
credentials = pika.PlainCredentials("attila","666")
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.81.130,credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="笑傲江湖")

def callbak(ch,method,properties,body):
    print("消费者接收到了任务:%r"%body.decode("utf8"))
# 有消息来临,立即执行callbak,没有消息则夯住,等待消息
# 老百姓开始去邮箱取邮件啦,队列名字是水许传
channel.basic_consume(callbak,queue="笑傲江湖",no_ack=True)  # no_ack=True, 就是不需要回复,我只负责取走,不给你反馈
# 开始消费,接收消息
channel.start_consuming()

# 在浏览器上查看rabbitmq,发现消息没有了 

  ACK机制(用于单发送多接收)

一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。
  同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
# ACK机制用于保证消费者如果拿了队列的消息,客户端处理时出错了,那么队列中任然还存在这个消息,等待下一位消费者继续取.

  建一个 生产者.py文件

#!/usr/bin/env python
import pika
# 创建凭证,使用rabbitmq用户密码登录
# 比如去邮局取邮件,必须得验证身份
credentials = pika.PlainCredentials("attila","666")
# 新建连接,这里localhost可以更换为服务器ip
# 找到这个邮局,等于连接上服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.81.130,credentials=credentials))
# 创建频道
# 建造一个大邮箱,隶属于这家邮局的邮箱,就是个连接
channel = connection.channel()
# 新建一个hello队列,用于接收消息
# 这个邮箱可以收发各个班级的邮件,通过
channel.queue_declare(queue=笑傲江湖)
# 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=‘‘),
  它允许我们精确的指定发送给哪个队列(routing_key=‘‘),参数body值发送的数据
channel.basic_publish(exchange=‘‘,
                      routing_key=笑傲江湖,
                      body=令狐冲独孤九剑笑傲江湖)
print("已经发送了消息")
# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
connection.close()
# 启动py文件,查看浏览器上的RabbitMQ, http://192.168.81.130:15672/  此时,浏览器上应该有一条数据了

   创建一个消费者

import pika

credentials = pika.PlainCredentials("s14","123")
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.119.10,credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue=金品没)

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body.decode("utf-8"))
    # int(‘嘻嘻嘻‘)   # 强转为了让他报错,但是消息取出来了,也就是没有消息了,那么这样会丢数据,所以需要把no-ack改为False
    # 我告诉rabbitmq服务端,我已经取走了消息
    # 回复方式在这
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 关闭no_ack,代表给与服务端ack回复,确认给与回复,这样如果取数据过程中出错,rabbitmq中消息不会被取走.
channel.basic_consume(callback,queue=金品没,no_ack=False)

channel.start_consuming()

  消息持久化

# 演示
1.执行生产者,向队列写入数据,产生一个新队列queue
2.重启服务端,队列丢失

3.开启生产者数据持久化后,重启rabbitmq,队列不丢失
4.依旧可以读取数据

  创建可持久化的队列

    生产者.py

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.81.130‘))
# 有密码
credentials = pika.PlainCredentials("attila","666")
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.81.130,credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
# 默认此队列不支持持久化,如果服务挂掉,数据丢失
# durable=True 开启持久化,必须新开启一个队列,原本的队列已经不支持持久化了
‘‘‘
实现rabbitmq持久化条件
 delivery_mode=2
使用durable=True声明queue是持久化
 
‘‘‘
channel.queue_declare(queue=天龙八部,durable=True)
channel.basic_publish(exchange=‘‘,
                      routing_key=天龙八部, # 消息队列名称
                      body=降龙十八掌天下第一,
                      # 支持数据持久化
                      properties=pika.BasicProperties(
                          delivery_mode=2,#代表消息是持久的  2
                      )
                      )
connection.close()

  消费者.py

import pika
credentials = pika.PlainCredentials("attila","666")
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.81.30,credentials=credentials))
channel = connection.channel()
# 确保队列持久化
channel.queue_declare(queue=天龙八部,durable=True)

‘‘‘
必须确保给与服务端消息回复,代表我已经消费了数据,否则数据一直持久化,不会消失
‘‘‘
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body.decode("utf-8"))
    # 模拟代码报错
    # int(‘asdfasdf‘)    # 此处报错,没有给予回复,保证客户端挂掉,数据不丢失
   
    # 告诉服务端,我已经取走了数据,否则数据一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 关闭no_ack,代表给与回复确认
channel.basic_consume(callback,queue=天龙八部,no_ack=False)
channel.start_consuming()

 

以上是关于RabbitMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

Rabbitmq 消息队列

微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)

RabbitMQ 消息队列学习

RabbitMQ确认机制问题处理