Rabbitmq 介绍 安装基于Queue实现生产者消费者模型基本使用消息安全之ackdurable持久化利用闲置消费发布订阅发布订阅高级之Royting(按关键字匹配)Topic关键字模糊匹配基于r

Posted 人生苦短,我用python

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq 介绍 安装基于Queue实现生产者消费者模型基本使用消息安全之ackdurable持久化利用闲置消费发布订阅发布订阅高级之Royting(按关键字匹配)Topic关键字模糊匹配基于r相关的知识,希望对你有一定的参考价值。

师承老刘llnb

一、消息队列介绍

1.1介绍

消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”

1.2MQ解决什么问题

MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。
应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。

当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障。提升系统的可用性

流量削峰
举个栗子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。

使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这事有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
消息分发
多个服务队数据感兴趣,只需要监听同一类消息即可处理。

例如A产生数据,B对数据感兴趣。如果没有消息的队列A每次处理完需要调用一下B服务。过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务。只要有服务需要,A服务都要改动代码。很不方便。

有了消息队列后,A只管发送一次消息,B对消息感兴趣,只需要监听消息。C感兴趣,C也去监听消息。A服务作为基础服务完全不需要有改动
异步消息

有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅

使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。

这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息

1.3 常见消息队列及比较


结论:

Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka

二 Rabbitmq安装

官网:https://www.rabbitmq.com/getstarted.html

2.1 服务端原生安装

# 安装配置epel源
# 安装erlang
yum -y install erlang
# 安装RabbitMQ
yum -y install rabbitmq-server

2.2服务端Docker安装

docker pull rabbitmq:management
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:managemen

2.3客户端安装

pip3 install pika

2.4 设置用户和密码

rabbitmqctl add_user lqz 123
# 设置用户为administrator角色
rabbitmqctl set_user_tags lqz administrator
# 设置权限
rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"

# 然后重启rabbiMQ服务
systemctl reatart rabbitmq-server
 
# 然后可以使用刚才的用户远程连接rabbitmq server了。

三 基于Queue实现生产者消费者模型

import Queue
import threading

message = Queue.Queue(10)

def producer(i):
    while True:
        message.put(i)

def consumer(i):
    while True:
        msg = message.get()

for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

四 基本使用(生产者消费者模型)

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(\'127.0.0.1\'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue=\'lqz\')

channel.basic_publish(exchange=\'\',
                      routing_key=\'lqz\', # 消息队列名称
                      body=\'hello world\')
connection.close()

消费者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue=\'lqz\')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=\'lqz\',on_message_callback=callback,auto_ack=True)

channel.start_consuming()

五 消息安全之ack

生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(\'127.0.0.1\'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue=\'lqz\')

channel.basic_publish(exchange=\'\',
                      routing_key=\'lqz\', # 消息队列名称
                      body=\'hello world\')
connection.close()

消费者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue=\'lqz\')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    # ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue=\'lqz\',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

六 消息安全之durable持久化

生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(\'127.0.0.1\'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue=\'lqz1\',durable=True)

channel.basic_publish(exchange=\'\',
                      routing_key=\'lqz1\', # 消息队列名称
                      body=\'111\',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )
connection.close()

消费者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue=\'lqz1\')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    # ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue=\'lqz1\',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

七 闲置消费

正常情况如果有多个消费者,是按照顺序第一个消息给第一个消费者,第二个消息给第二个消费者

但是可能第一个消息的消费者处理消息很耗时,一直没结束,就可以让第二个消费者优先获得闲置的消息
生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(\'127.0.0.1\'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue=\'lqz123\',durable=True)

channel.basic_publish(exchange=\'\',
                      routing_key=\'lqz123\', # 消息队列名称
                      body=\'111\',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )
connection.close()

消费者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
# channel.queue_declare(queue=\'lqz123\')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_consume(queue=\'lqz123\',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

八 发布订阅

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange=\'m1\',exchange_type=\'fanout\')

channel.basic_publish(exchange=\'m1\',
                      routing_key=\'\',
                      body=\'lqz nb\')

connection.close()

订阅者(启动几次订阅者会生成几个队列)

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# exchange=\'m1\',exchange(秘书)的名称
# exchange_type=\'fanout\' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange=\'m1\',exchange_type=\'fanout\')

# 随机生成一个队列
result = channel.queue_declare(queue=\'\',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange=\'m1\',queue=queue_name)


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

九 发布订阅高级之Routing(按关键字匹配)

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange=\'m2\',exchange_type=\'direct\')

channel.basic_publish(exchange=\'m2\',
                      routing_key=\'bnb\', # 多个关键字,指定routing_key
                      body=\'lqz nb\')

connection.close()

订阅者1

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# exchange=\'m1\',exchange(秘书)的名称
# exchange_type=\'direct\' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange=\'m2\',exchange_type=\'direct\')

# 随机生成一个队列
result = channel.queue_declare(queue=\'\',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange=\'m2\',queue=queue_name,routing_key=\'nb\')
channel.queue_bind(exchange=\'m2\',queue=queue_name,routing_key=\'bnb\')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

订阅者2

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# exchange=\'m1\',exchange(秘书)的名称
# exchange_type=\'direct\' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange=\'m2\',exchange_type=\'direct\')

# 随机生成一个队列
result = channel.queue_declare(queue=\'\',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange=\'m2\',queue=queue_name,routing_key=\'nb\')



def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

9.1发布订阅高级之Topic(按关键字模糊匹配)

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange=\'m3\',exchange_type=\'topic\')

channel.basic_publish(exchange=\'m3\',
                      # routing_key=\'lqz.handsome\', #都能收到
                      routing_key=\'lqz.handsome.xx\', #只有lqz.#能收到
                      body=\'lqz nb\')

connection.close()

订阅者1
只能加一个单词

可以加任意单词字符

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# exchange=\'m1\',exchange(秘书)的名称
# exchange_type=\'direct\' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange=\'m3\',exchange_type=\'topic\')

# 随机生成一个队列
result = channel.queue_declare(queue=\'\',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange=\'m3\',queue=queue_name,routing_key=\'lqz.#\')



def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

订阅者2

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# exchange=\'m1\',exchange(秘书)的名称
# exchange_type=\'topic\' , 模糊匹配
channel.exchange_declare(exchange=\'m3\',exchange_type=\'topic\')

# 随机生成一个队列
result = channel.queue_declare(queue=\'\',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange=\'m3\',queue=queue_name,routing_key=\'lqz.*\')


def callback(ch, method, properties, body):
  	queue_name = result.method.queue # 发送的routing_key是什么
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

十 基于rabbitmq实现rpc

服务端

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\',credentials=credentials))
channel = connection.channel()

# 起翰监听任务队列
channel.queue_declare(queue=\'rpc_queue\')

def on_request(ch, method, props, body):
    n = int(body)
    response = n + 100
    # props.reply_to  要放结果的队列.
    # props.correlation_id  任务
    ch.basic_publish(exchange=\'\',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume( queue=\'rpc_queue\',on_message_callback=on_request,)
channel.start_consuming()

客户端

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("admin", "admin")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(\'10.0.0.200\', credentials=credentials))
        self.channel = self.connection.channel()

        # 随机生成一个消息队列(用于接收结果)
        result = self.channel.queue_declare(queue=\'\',exclusive=True)
        self.callback_queue = result.method.queue

        # 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
        self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response, auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 客户端 给 服务端 发送一个任务:  任务id = corr_id / 任务内容 = \'30\' / 用于接收结果的队列名称
        self.channel.basic_publish(exchange=\'\',
                                   routing_key=\'rpc_queue\', # 服务端接收任务的队列名称
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue, # 用于接收结果的队列
                                         correlation_id = self.corr_id, # 任务ID
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()

        return self.response

fibonacci_rpc = FibonacciRpcClient()

response = fibonacci_rpc.call(50)
print(\'返回结果:\',response)

SpringBoot整合RabbitMQ实现死信队列

文章目录


前面一文通过 Java整合RabbitMQ实现生产消费(7种通讯方式),本文基于SpringBoot实现RabbitMQ中的死信队列和延迟队列。

概念介绍

什么是死信

死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:

  1. 消费者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。
  2. 消息到达生存时间还未被消费。
  3. 队列超过长度限制,消息被丢弃。

这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图

死信队列应用

  • 作为消息可靠性的一个扩展。比如,在队列已满的情况下也不会丢失消息。
  • 可以实现延迟消费功能。比如,订单15分钟内未支付。

注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:

  1. 维护多个队列,每个队列维护一个TTL时间。
  2. 使用延迟交换机。这种方式需要下载插件支持,参考链接:RabbitMQ插件

工程搭建

环境说明

  • RabbitMQ环境,参考RabbitMQ环境搭建
  • Java版本:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

搭建步骤

  1. 创建SpringBoot项目。
  2. pom.xml文件导入RabbitMQ依赖。
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. application.yml文件添加RabbitMQ配置。
spring:
  # rabbitmq配置信息 RabbitProperties类
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启confirm机制
    publisher-confirm-type: correlated
    # 开启return机制
    publisher-returns: true
    #全局配置,局部配置存在就以局部为准
    listener:
      simple:
        acknowledge-mode: manual # 手动ACK

实现死信

准备Exchange&Queue

@Configuration
public class RabbitMQConfig 

    /**
     * 正常队列
     */
    public static final String EXCHANGE = "boot-exchange";

    public static final String QUEUE = "boot-queue";

    public static final String ROUTING_KEY = "boot-rout";

    /**
     * 死信队列
     */
    public static final String DEAD_EXCHANGE = "dead-exchange";

    public static final String DEAD_QUEUE = "dead-queue";

    public static final String DEAD_ROUTING_KEY = "dead-rout";

    /**
     * 声明死信交换机
     *
     * @return
     */
    @Bean
    public Exchange deadExchange() 
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
    

    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean
    public Queue deadQueue() 
        return QueueBuilder.durable(DEAD_QUEUE).build();
    


    /**
     * 绑定死信的队列和交换机
     *
     * @param deadExchange
     * @param deadQueue
     * @return
     */
    @Bean
    public Binding deadBind(Exchange deadExchange, Queue deadQueue) 
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    

    /**
     * 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
     *
     * @return
     */
    @Bean
    public Exchange bootExchange() 
        return ExchangeBuilder.directExchange(EXCHANGE).build();
    

    /**
     * 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null);
     * 绑定死信交换机及路由key
     *
     * @return
     */
    @Bean
    public Queue bootQueue() 
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                //给队列设置消息时长
                //.ttl(10000)
                //队列最大长度
                .maxLength(1)
                .build();
    

    /**
     * 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
     *
     * @param bootExchange
     * @param bootQueue
     * @return
     */
    @Bean
    public Binding bootBind(Exchange bootExchange, Queue bootQueue) 
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    


监听死信队列

    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
    public void listener_dead(String msg, Channel channel, Message message) throws IOException 
        System.out.println("死信接收到消息" + msg);
        System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
        System.out.println("messageID:" + message.getMessageProperties().getMessageId());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    

方式一——消费者拒绝&否认

  • 拒绝消息
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void listener(String msg, Channel channel, Message message) throws IOException 
        System.out.println("接收到消息" + msg);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
    
  • 否认消息
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void listener(String msg, Channel channel, Message message) throws IOException 
        System.out.println("接收到消息" + msg);
 		channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    

方式二——超过消息TTL

  • 发送消息时设置TTL
@SpringBootTest
public class Publisher 

    @Autowired
    private RabbitTemplate template;
        /**
     * 5秒未被消费会路由到死信队列
     */
    @Test
    public void publish_expir() 
        template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> 
            message.getMessageProperties().setExpiration("5000");
            return message;
        );
    

  • 设置队列所有消息的TTL
    更新RabbitMQConfig类中bootQueue() ,更新后需要删除队列,因为队列属性有更改
    @Bean
    public Queue bootQueue() 
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                //给队列设置消息时长
                .ttl(10000)
                .build();
    

方式三——超过队列长度限制

设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。

    @Bean
    public Queue bootQueue() 
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                .maxLength(1)
                .build();
    

代码仓库

点我

以上是关于Rabbitmq 介绍 安装基于Queue实现生产者消费者模型基本使用消息安全之ackdurable持久化利用闲置消费发布订阅发布订阅高级之Royting(按关键字匹配)Topic关键字模糊匹配基于r的主要内容,如果未能解决你的问题,请参考以下文章

消息队列介绍RabbitMQ&Redis的重点介绍与简单应用

SpringBoot整合RabbitMQ实现死信队列

rabbitmq - (消息队列) 的基本原理介绍

rabbitMq介绍

消息队列介绍与RabbitMQ基本示例

3.RabbitMQ 第一个程序