RabbitMq高级特性之消费端限流 通俗易懂 超详细 内含案例

Posted beixuan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq高级特性之消费端限流 通俗易懂 超详细 内含案例相关的知识,希望对你有一定的参考价值。

RabbitMq高级特性之消费端限流

一丶首先部署SpringBoot框架

  1. 完成 SpringBoot 整合 RabbitMq 中的Topic通配符模式

二丶在 resource资源文件夹里application.yml文件中 添加配置

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #开启手动签收
        prefetch: 3 #一次就收三条

三、更改ProducerTest.java文件

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class producerTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
    
        String routingKey = "item.insert";
    
        int count = 1;
        while (count <= 9){
            String message = "发送第"+count+"条消息";
            //log.debug("路由键:{}",routingKey);
            rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE_NAME,routingKey,message);
            count++;
        }
        log.debug("发送成功");
    }
}

四、更改CounmerListener.java文件

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * 消费者 消费监听器
 */
@Component
@RabbitListener(queues = "direct_queue")
@Slf4j
public class ConsumerListener {

    @RabbitHandler
    public void accept(@Payload String message, @Headers Map map, Channel channel){

        long deliveryTag = (long) map.get(AmqpHeaders.DELIVERY_TAG);
        log.debug("deliveryTag:{}",deliveryTag);
        log.debug("message:{}",message);
        if (deliveryTag % 3 == 0) {
            try {
                //确认收到
                channel.basicAck(deliveryTag,true);
                Thread.sleep(3000);
                log.debug("休息三秒然后在接受消息");
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }
        }
    }

}

五、测试

先运行测试文件 创建交换机和队列

然后在运行消息监听器

本次内容运用到 RabbitMq确认消息机制

以上是关于RabbitMq高级特性之消费端限流 通俗易懂 超详细 内含案例的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例

RabbitMq高级特性之死信队列 通俗易懂 超详细 内含案例

RabbitMQ高级特性(2)

RabbitMQ消费端限流策略

RabbitMQ 消费端限流TTL死信队列

RabbitMQ之消息模式(下)