RabbitMQ的使用
Posted 永旗狍子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ的使用相关的知识,希望对你有一定的参考价值。
目录
RabbitMQ的使用
一.Java连接RabbitMQ
1.1创建Maven项目
pass 。。。。
1.2导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
1.3创建工具类连接RabbitMQ
public class MQConnection {
public static Connection getConnetion(){
// 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.247.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
factory.setVirtualHost("/root");
// 创建Connection
Connection connection =null;
try {
connection = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
二.通讯方式
1.Hello-World
一个生产者,一个默认的交换机,一个队列,一个消费者
创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
public class publish {
//创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
@Test
public void publish() throws IOException, TimeoutException {
//1. 获取Connection
Connection connetion = MQConnection.getConnetion();
//2. 创建Channel
Channel channel = connetion.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg="Hello-word!666";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("","Hello",null,msg.getBytes());
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("消息发布成功!加油");
//4. 释放资源
channel.close();
connetion.close();
}
}
创建消费者,创建一个channel,创建一个队列,并且去消费当前队列
public class consume {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Hello",true,false,false,null);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受消息:"+new String(body,"utf-8"));
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("Hello",true,consumer);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connetion.close();
}
}
2.Work
一个生产者,一个默认的交换机,一个队列,两个消费者
只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
消费者指定Qos和手动ack
public class publish {
//创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
@Test
public void publish() throws IOException, TimeoutException {
//1. 获取Connection
Connection connetion = MQConnection.getConnetion();
//2. 创建Channel
Channel channel = connetion.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg="Hello-word!666";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
for (int i=0;i<10;i++){
channel.basicPublish("","Work",null,msg.getBytes());
}
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("消息发布成功!加油");
//4. 释放资源
channel.close();
connetion.close();
}
}
public class consume1 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Work",true,false,false,null);
// 指定当前消费者,一次消费多少个消息
channel.basicQos(4);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("Work",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
public class consume2 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Work",true,false,false,null);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("Work",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
3.Publish/Subscribe
一个生产者,一个交换机,两个队列,两个消费者
声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
public class publish {
// 声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
// 让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
@Test
public void publish() throws IOException, TimeoutException {
//1. 获取Connection
Connection connetion = MQConnection.getConnetion();
//2. 创建Channel
Channel channel = connetion.createChannel();
//3. 创建exchange - 绑定某一个队列
//参数1: exchange的名称
//参数2: 指定exchange的类型
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("pubsub-queue1","pubsub-exchange","");
channel.queueBind("pubsub-queue2","pubsub-exchange","");
//4. 发布消息到exchange,同时指定路由的规则
String msg="Hello-word!666";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
for (int i=0;i<10;i++){
channel.basicPublish("pubsub-exchange","",null,msg.getBytes());
}
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("消息发布成功!加油");
//4. 释放资源
channel.close();
connetion.close();
}
}
public class consume1 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("pubsub-queue1",true,false,false,null);
// 指定当前消费者,一次消费多少个消息
// channel.basicQos(6);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("pubsub-queue1",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
public class consume2 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("pubsub-queue2",true,false,false,null);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("pubsub-queue2",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
4.Routing
一个生产者,一个交换机,两个队列,两个消费者
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。
public class publish {
// 生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,
// 并且在发送消息时,指定消息的具体RoutingKey即可。
@Test
public void publish() throws IOException, TimeoutException {
//1. 获取Connection
Connection connetion = MQConnection.getConnetion();
//2. 创建Channel
Channel channel = connetion.createChannel();
//3. 创建exchange, routing-queue-error,routing-queue-info,
//参数1: exchange的名称
//参数2: 指定exchange的类型
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue-error","routing-exchange","ERROR");
channel.queueBind("routing-queue-info","routing-exchange","INFO");
//4. 发布消息到exchange,同时指定路由的规则
String msg="Hello-word!666";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("消息发布成功!加油");
//4. 释放资源
channel.close();
connetion.close();
}
}
public class consume1 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("routing-queue-error",true,false,false,null);
// 指定当前消费者,一次消费多少个消息
// channel.basicQos(6);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("routing-queue-error",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
public class consume2 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("routing-queue-info",true,false,false,null);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("routing-queue-info",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
5.Topic
一个生产者,一个交换机,两个队列,两个消费者
生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
public class publish {
// 生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,
// 编写时注意格式 xxx.xxx.xxx去编写, \\* -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
@Test
public void publish() throws IOException, TimeoutException {
//1. 获取Connection
Connection connetion = MQConnection.getConnetion();
//2. 创建Channel
Channel channel = connetion.createChannel();
//3. 创建exchange并指定绑定方式
// fast.red.cat
// fast.white.dog
// slow.yello.dog
channel.exchangeDeclare("topic-exchange",BuiltinExchangeType.TOPIC);
channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
channel.queueBind("topic-queue-2","topic-exchange","fast.#");
channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
//4. 发布消息到exchange,同时指定路由的规则
// String msg="Hello-word!666";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("消息发布成功!加油");
//4. 释放资源
channel.close();
connetion.close();
}
}
public class consume1 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("topic-queue-1",true,false,false,null);
// 指定当前消费者,一次消费多少个消息
// channel.basicQos(6);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("topic-queue-1",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
public class consume2 {
@Test
public void Consume() throws IOException, TimeoutException {
Connection connetion = MQConnection.getConnetion();
final Channel channel = connetion.createChannel();
//3. 声明队列-HelloWorld declare声明
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("topic-queue-2",true,false,false,null);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
//2. 手动ack
//参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
//参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("topic-queue-2",false,consumer);
System.in.read();
channel.close();
connetion.close();
}
}
以上是关于RabbitMQ的使用的主要内容,如果未能解决你的问题,请参考以下文章