消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式消费者(接收方)自行确认模式生产者(发送方)确认模式)
Posted 小不点啊
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式消费者(接收方)自行确认模式生产者(发送方)确认模式)相关的知识,希望对你有一定的参考价值。
准备工作:
1)安装RabbitMQ,参考文章:消息中间件系列二:RabbitMQ入门(基本概念、RabbitMQ的安装和运行)
2.)分别新建名为OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程
在pom.xml文件里面引入如下依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency>
说明:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本
一、消费者(接收方)自动确认模式
前面有谈到消费者收到的每一条消息都必须进行确认,消息的确认机制分为自动确认和消费者自行确认。下面我们来看一下自动确认的示例:
示例1:交换器是direct
1. 在工程OriginalRabbitMQProducer新建一个一个direct的生产者
package study.demo.normal; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交换器是direct的生产者 路由键完全匹配时,消息才投放到对应队列 * @author leeSmall * @date 2018年9月15日 * */ public class DirectProducer { //定义交换器的名字 private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的 //factory.setUsername(..); //设置连接断开 这里使用缺省的 //factory.setPort(); //设置虚拟主机 这里使用缺省的 //factory.setVirtualHost(); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //定义一组路由键 String[]routingKeys = {"error","info","warning"}; //发布消息的交换器上 for(int i=0;i<3;i++){ //路由键 String routingKey = routingKeys[i]; //要发送的消息 String message = "Hello world_"+(i+1); /** * 发送消息到交换器上 * 参数1:交换器的名字 * 参数2:路由键 * 参数3:BasicProperties * 参数4:要发送的消息 */ channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes()); System.out.println("Sent "+routingKey+":"+message); } channel.close(); connection.close(); } }
2. 在工程OriginalRabbitMQConsumer新建一个direct的只消费error日志的消费者
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交换器是direct 只消费error日志的消费者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerError { //定义交换器的名字 private static final String EXCHANGE_NAME = "direct_logs"; // private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明随机队列 String queueName = channel.queueDeclare().getQueue(); //6.声明一个只消费错误日志的路由键error String routingKey = "error"; //7.队列通过路由键绑定到交换器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自动确认:autoAck参数为true channel.basicConsume(queueName,true,consumerB); } }
3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果
启动消费者ConsumerError:
启动生产者DirectProducer:
查看消费者ConsumerError现在的状况:
可以看到消费者只消费了error级别的消息,这是因为在direct模式下,消费者只定义了路由键routingKey = "error";
4. 在工程OriginalRabbitMQConsumer新建一个direct的消费所有日志的消费者
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交换器是direct 消费所有日志的消费者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerAll { //定义交换器的名字 private static final String EXCHANGE_NAME = "direct_logs"; // private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明随机队列 String queueName = channel.queueDeclare().getQueue(); //6.定义一组路由键消费所有日志 String[]routingKeys = {"error","info","warning"}; //7.队列通过路由键绑定到交换器上 for(String routingKey:routingKeys){ //队列和交换器的绑定 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); } System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息 Consumer consumerA = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自动确认:autoAck参数为true channel.basicConsume(queueName,true,consumerA); } }
5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果
启动消费者ConsumerAll:
启动生产者DirectProducer:
查看消费者ConsumerAll的状态:
可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在direct模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};
示例2:交换器是fanout
1. 在工程OriginalRabbitMQProducer新建一个交换器是fanout的生产者
package study.demo.normal; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交换器是fanout的生产者 可以理解为广播,会把所有消息投放到绑定到这个交换器上的队列上 * @author leeSmall * @date 2018年9月15日 * */ public class FanoutProducer { private final static String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的 //factory.setUsername(..); //设置连接断开 这里使用缺省的 //factory.setPort(); //设置虚拟主机 这里使用缺省的 //factory.setVirtualHost(); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //定义一组路由键 String[]routingKeys = {"error","info","warning"}; //发布消息的交换器上 for(int i=0;i<3;i++){ //路由键 String routingKey = routingKeys[i]; //要发送的消息 String message = "Hello world_"+(i+1); /** * 发送消息到交换器上 * 参数1:交换器的名字 * 参数2:路由键 * 参数3:BasicProperties * 参数4:要发送的消息 */ channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes()); System.out.println("Sent "+routingKey+":"+message); } channel.close(); connection.close(); } }
2. 在工程OriginalRabbitMQConsumer新建一个fanout的只消费error日志的消费者
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交换器是fanout,只消费error日志的消费者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerError { //定义交换器的名字 //private static final String EXCHANGE_NAME = "direct_logs"; private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //5.声明随机队列 String queueName = channel.queueDeclare().getQueue(); //6.声明一个只消费错误日志的路由键error String routingKey = "error"; //7.队列通过路由键绑定到交换器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自动确认:autoAck参数为true channel.basicConsume(queueName,true,consumerA); } }
3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果
启动消费者ConsumerError:
启动生产者FanoutProducer:
查看消费者ConsumerError现在的状况:
可以看到消费者消费了所有级别的消息,这是因为在fanout模式下,虽然消费者只定义了路由键routingKey = "error",但是因为fanut是广播模式,会把所有消息投放到绑定到这个交换器上的队列上
4. 在工程OriginalRabbitMQConsumer新建一个fanout的消费所有日志的消费者
package study.demo.normal; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 交换器是fanout 消费所有日志的消费者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerAll { //定义交换器的名字 //private static final String EXCHANGE_NAME = "direct_logs"; private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //5.声明随机队列 String queueName = channel.queueDeclare().getQueue(); //6.定义一组路由键消费所有日志 String[]routingKeys = {"error","info","warning"}; //7.队列通过路由键绑定到交换器上 for(String routingKey:routingKeys){ //队列和交换器的绑定 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); } System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息 Consumer consumerA = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //9.自动确认:autoAck参数为true channel.basicConsume(queueName,true,consumerA); } }
5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果
启动消费者ConsumerAll:
启动生产者FanoutProducer:
查看消费者ConsumerAll的状态:
可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在fanout模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};
三、消费者(接收方)自行确认模式
1. 在工程OriginalRabbitMQProducer新建一个消费者自行确认生产者
package study.demo.consumerconfirm; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消费者自行确认生产者 * @author leeSmall * @date 2018年9月15日 * */ public class ConsumerConfirmProducer { //交换器 private final static String EXCHANGE_NAME = "direct_cc_confirm_1"; //路由键 private final static String ROUTE_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的 //factory.setUsername(..); //设置连接断开 这里使用缺省的 //factory.setPort(); //设置虚拟主机 这里使用缺省的 //factory.setVirtualHost(); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //发布消息的交换器上 for(int i=0;i<10;i++){ //要发送的消息 String message = "Hello world_"+(i+1); /** * 发送消息到交换器上 * 参数1:交换器的名字 * 参数2:路由键 * 参数3:BasicProperties * 参数4:要发送的消息 */ channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes()); System.out.println("Sent "+ROUTE_KEY+":"+message); } channel.close(); connection.close(); } }
2. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认消费者
package study.demo.consumerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消费者自行确认消费者 * @author leeSmall * @date 2018年9月15日 * */ public class ClientConsumerAck { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列 String queueName = "consumer_confirm"; channel.queueDeclare(queueName,false,false, false,null); //6.声明一个只消费错误日志的路由键error String routingKey = "error"; //7.队列通过路由键绑定到交换器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); //消费者自行确认 this.getChannel().basicAck(envelope.getDeliveryTag(),false); } }; //9.消费者自行确认:autoAck参数为false channel.basicConsume(queueName,false,consumerB); } }
3. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认休眠不回复ack的消费者
package study.demo.consumerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消费者自行确认休眠不回复ack的消费者 * @author leeSmall * @date 2018年9月15日 * */ public class ClientConsumerSlowAck { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接 Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的 Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列 String queueName = "consumer_confirm"; channel.queueDeclare(queueName,false,false, false,null); //6.声明一个只消费错误日志的路由键error String routingKey = "error"; //7.队列通过路由键绑定到交换器上 channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息 Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //消费者自行确认时不回复ack,一直休眠 Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); //this.getChannel().basicAck(envelope.getDeliveryTag(),false); } }; //9.消费者自行确认:autoAck参数为false channel.basicConsume(queueName,false,consumerB); } }
4. 分别启动消费者自行确认消费者ClientConsumerAck和消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck,再启动消费者自行确认生产者ConsumerConfirmProducer查看状态
启动消费者自行确认消费者ClientConsumerAck:
启动消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck
启动消费者自行确认生产者ConsumerConfirmProducer
查看消费者自行确认消费者ClientConsumerAck的状态:
查看消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck的状态:
查看RabbitMQ服务器上的队列情况:
可以看到队列consumer_confirm里面有5条消息未消费,这是因为消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck收到了这5条消息,但是没有向RabbitMQ服务器发送确认消息,RabbitMQMQ认为这5条消息还没有被消费就一直存在队列里面
下面停掉ClientConsumerSlowAck,查看ClientConsumerAck和RabbitMQ服务器里面队列consumer_confirm的状态
可以看到停掉ClientConsumerSlowAck以后,之前的5条消息被ClientConsumerAck消费了
5. 在工程OriginalRabbitMQConsumer 新建一个消费者自行确认拒绝消息的消费者
package study.demo.consumerconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * @Description: 消费者自行确认拒绝消息的消费者 * @author leeSmall * @date 2018年9月15日 * */ public class ClientConsumerReject { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址 factory.setHost("127.0.0.1"以上是关于消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式消费者(接收方)自行确认模式生产者(发送方)确认模式)的主要内容,如果未能解决你的问题,请参考以下文章Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送
消息中间件系列二:RabbitMQ入门(基本概念RabbitMQ的安装和运行)