RabbitMQ消费端自定义监听
Posted luhan777
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消费端自定义监听相关的知识,希望对你有一定的参考价值。
场景:
我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理。
实际环境:
我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用。
操作:
//生产端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; for(int i =0; i<5; i ++) channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
//消费端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //使用自定义consumer channel.basicConsume(queueName, true, new MyConsumer(channel));
//自定义消费端 //继承DefaultConsumer类 public class MyConsumer extends DefaultConsumer public MyConsumer(Channel channel) super(channel); //重写handleDelivery() @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body));
运行结果:
以上是关于RabbitMQ消费端自定义监听的主要内容,如果未能解决你的问题,请参考以下文章
ESP32学习笔记(30)——BLE GATT服务端自定义服务和特征
SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理
扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题