RabbitMQ入门案例

Posted HairLossException

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门案例相关的知识,希望对你有一定的参考价值。

文章目录

官方文档


1.Hello World


生产者代码

public class Producer 
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP连接RabbitMQ队列
        factory.setHost("192.168.140.129");
        //用户名
        factory.setUsername("why");
        //密码
        factory.setPassword("123");
        //创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        /*
        *生成队列
        *1、队列名称,若队列不存在就创建队列
        *2、队列里的消息是否支持持久化,默认消息存储在内存种
        *3、是否独占队列 true表示消费者独占队列
        *4、是否自动删除 true表示在消费者消费完队列中的数据并与该队列连接断开时自动删除队列
        *5、额外参数设置(本例种暂时不用)
        * */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "hello world";
        /*
        *发送消息
        * 1、发送到哪个交换机
        * 2、路由的key
        * 3、其他参数信息
        * 4、发送消息的消息体
        * */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    

消费者代码

public class Consumer 
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.140.129");
        factory.setUsername("why");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待消息接收");

        DeliverCallback deliverCallback = (consumerTag,delivery)->
            String message = new String(delivery.getBody());
            System.out.println("接收到的消息"+message);
        ;
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag)->
            System.out.println("消费者被中断");
        ;
        /*
        * 消费消息
        * 1、消费哪个队列
        * 2、消费成功之后是否自动应答 true表示自动应答 false表示手动应答
        * 3、消费者成功消费的回调函数
        * 4、消费者被中断消费的回调
        * */
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    

启动生产者程序我们发现在消息队列种存在一条未被消费的消息

然后启动消费者程序发现消息已经被消费

2.Work Queues


将可以复用的代码提取成工具类

public class MQUtil 
    public static Channel getChannel() throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.140.129");
        factory.setUsername("why");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    

生产者代码

public class Producer 
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception
        Channel channel = MQUtil.getChannel();
        /*
        *生成队列
        *1、队列名称,若队列不存在就创建队列
        *2、队列里的消息是否支持持久化,默认消息存储在内存种
        *3、是否独占队列 true表示消费者独占队列
        *4、是否自动删除 true表示在消费者消费完队列中的数据并与该队列连接断开时自动删除队列
        *5、额外参数设置(本例种暂时不用)
        * */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*
        *一次发送4条消息
        * 1、发送到哪个交换机
        * 2、路由的key
        * 3、其他参数信息
        * 4、发送消息的消息体
        * */
        for (int i = 1; i < 5; i++) 
            String message = "message"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        

        System.out.println("消息发送完毕");
    

消费者代码还是上个例子中的代码,启动两个消费者线程

运行结果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
1号正在等待消息接收
1号消费者接收到的消息message1
1号消费者接收到的消息message3
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2号正在等待消息接收
2号消费者接收到的消息message2
2号消费者接收到的消息message4

可以看出消息队列是轮询分发消息的

2.1消息应答

如果消费者在消费消息的过程种突然down掉,会导致消息的丢失。为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。

方法描述
Channel.basicAck(long tag,boolean multiple)消息已经成功处理,可以将其丢弃
Channel.basicNack(ong tag,boolean multiple,boolean b1)否定确认
Channel.basicReject(long tag,boolean multiple)否定确认,直接拒绝

如果消费者由于某些原因失去连接,导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。使用手动应答即使消费者在处理消息的过程中宕机也不会造成消息的丢失(注意这里的消息不丢失指的是消息队列分配给消费者过程种消息不会丢失)

消费者代码

public class HandProducer 
    private static final String QUEUE_NAME = "hand";

    public static void main(String[] args) throws Exception
        Channel channel = MQUtil.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext())
            String message = scanner.nextLine();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("生产者发出消息"+message);
        
    

1号消费者代码

public class HandConsumer 
    private static final String QUEUE_NAME = "hand";

    public static void main(String[] args) throws Exception
        Channel channel = MQUtil.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        DeliverCallback deliverCallback = (consumerTag,delivery)->
            String message = new String(delivery.getBody());
            try 
                Thread.sleep(3*1000);//假设1号消费者处理消息耗时三秒
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println("1号消费者接收到的消息"+message);
            //1、消息标记tag
            //2、false表示不启动批应答
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        ;
        CancelCallback cancelCallback = (consumerTag)->;
        //手动应答
        boolean auto = false;
        channel.basicConsume(QUEUE_NAME,auto,deliverCallback,cancelCallback);
        System.out.println("1号消费者正在等待接收消息");
    

2号消费者代码仅仅在处理时间上有所不同

try 
	Thread.sleep(10*1000);//假设2号消费者处理消息耗时十秒
 catch (InterruptedException e) 
	e.printStackTrace();

首先生产者发送两条消息

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
aaa
生产者发出消息aaa
bbb
生产者发出消息bbb

在10秒内关闭2号消费者程序来模拟宕机

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2号消费者正在等待接收消息

Process finished with exit code -1

最后两条消息都由1号消费者处理

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
1号消费者正在等待接收消息
1号消费者接收到的消息aaa
1号消费者接收到的消息bbb

2.2队列持久化

每次重启 RabbitMQ ,非持久化的队列都被删除。在声明队列的时候将durable参数设置为true表明创建持久化队列。如果队列存在需要先删除原先的非持久化队列才能生效

boolean durable = true;
//创建持久化队列
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);

2.3消息持久化

将消息标记为持久化并不能完全保证不会丢失消息。在消息保存的过程中依然会出现消息丢失的情况。实现消息持久化只需要将basicPublish方法参数的props改为MessageProperties.PERSISTENT_TEXT_PLAIN即可

//当队列是持久化队列时才能设置消息持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

2.4不公平分发

//写在消费者代码中
int prefetchCount = 1;
channel.basicQos(prefetchCount);

这段代码表示消费者在处理消息的过程中最多接收1个消息。如果消费者还在处理消息的过程中又有新的消息来了,RabbitMQ就会把消息分配给空闲的消费者。prefetchCount是预取值,表示该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息

3.发布确认

之前我们讨论过虽然队列持久化和消息持久化可以使消息不丢失,但是不能完全保证消息不丢失。因为在消息保存到磁盘的过程中依然有可能丢失数据。开启发布确认模式才能保证消息不丢失

开启发布确认

//开启发布确认
channel.confirmSelect();

3.1单个发布确认

发布一条消息只有它被确认发布后才继续发送后续消息

public class Producer 
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception
        Channel channel = MQUtil.getChannel();
        //开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        long beign = System.currentTimeMillis();
        for (int i = 1; i < 101; i++) 
            String message = "message"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            //单个发布确认
            boolean flag = channel.waitForConfirms();
            if (flag)
                System.out.println("消息发布成功");
            
        
        long end = System.currentTimeMillis();
        System.out.println("发布100条消息共耗时"+(end-beign)+"ms");
    

3.2批量发布确认

for (int i = 1; i < 101; i++) 
	String message = "message"+i;
	channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
	//批量发布确认:每发布10条消息确认一次
	if (i%10 == 0)
		boolean flag = channel.waitForConfirms();
		if (flag)
			System.out.println("消息发布成功");
		
	

3.3异步发布确认

public class Producer 
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception
        Channel channel = MQUtil.getChannel();
        //开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //线程安全有序的哈希表,适用于高并发
        ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
        long begin = System.currentTimeMillis();
        //消息确认收到的回调
        ConfirmCallback ackCallback = (deliveryTag,multiple)->
            //处理未确认消息第二步:删除已经确认的消息
            if (multiple)
                ConcurrentNavigableMap<Long,String> confirmedMap = map.headMap(deliveryTag);
                confirmedMap.clear();
            else
                map.remove(deliveryTag);
            
            System.out.println("确认收到消息"+deliveryTag);
        ;
        //消息确认失败的回调
        ConfirmCallback nackCallback = (deliveryTag,multiple)->
            //map中剩下的就是未确认的消息
            System.out.println("未确认消息"+deliveryTag);
        ;
        /*
        * 准备监听器,监听哪些消息收到了,哪些消息失效了
        * 1、ackCallback:监听哪些消息成功了
        * 2、nackCallback:监听哪些消息失败了
        * */
        channel.addConfirmListener(ackCallback,nackCallback);
        //批量发送
        for (int i = 1; i < 101; i++) 
            String message = "消息"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
            //处理未确认消息第一步:记录所有发送的消息
            map.put(channel.getNextPublishSeqNo()RabbitMQ延迟队列

RabbitMQ死信队列

rabbitmq死信队列及延迟队列

RabbitMQ—SpringBoot中实现死信队列

RabbitMQ的死信队列和延时队列

RabbitMQ 中的死信死信消息