RabbitMQ
Posted sangong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ相关的知识,希望对你有一定的参考价值。
目录
RabbitMQ
是什么
概念
? RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。
? RabbitMQ作为一个消息代理,主要负责接收、存储和转发消息,它提供了可靠的消息机制和灵活的消息路由,并支持消息集群和分布式部署,常用于应用解耦,耗时任务队列,流量削锋等场景。
基本结构
producer : 生产者,RabbitMQ的客户端,负责发送消息。
consumer :消费者,RabbitMQ的客户端,消费消息。
Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个 Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。
Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。
Virtual Host:一个虚拟概念,一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。
Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic三种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。
BindingKey :Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。
? Binding Key由Consumer在Binding Exchange与Message Queue时指定
*Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定*
Queue(Message Queue):消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。
Message :就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。
做什么
异步处理
Java使用消息队列还是直接使用线程池ExecutorService异步处理
应用解耦
流量削锋
怎么用
单发送单接收
Producer
package com.jc.rabbitmq.queue.oneponec;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
public final static String QUEUE_NAME="rabbitmq.jc.queue.oneponec";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
//队列名称、是否持久化(true表示是,队列将在服务器重启时生存)、是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、当所有消费者客户端连接断开时是否自动删除队列、队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//发送消息到队列中
String message = "Hello RabbitMQ2 ";
//basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "'");
System.out.println("生产者生产消息成功!!");
//关闭通道和连接
channel.close();
connection.close();
}
}
consumer
package com.jc.rabbitmq.queue.oneponec;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class CustomerTest{
private final static String QUEUE_NAME = "rabbitmq.jc.queue.oneponec";
public static void main(String[] args) throws IOException, TimeoutException {
//设置RabbitMQ地址
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
//队列名称、是否持久化(true表示是,队列将在服务器重启时生存)、是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、当所有消费者客户端连接断开时是否自动删除队列、队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
QueueingConsumer consumer = new QueueingConsumer(channel);
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println("消息的接收端已经开启了,可以接受消息了!!");
while(true){
QueueingConsumer.Delivery delivery;
try{
delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("队列中的消息是:" + msg);
}catch (ShutdownSignalException e){
// TODO Auto-generated catch block
e.printStackTrace();
}catch (ConsumerCancelledException e){
// TODO Auto-generated catch block
e.printStackTrace();
}catch (InterruptedException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
/*Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);*/
}
}
单发送多接收
P发送到队列上多条消息,分别被C1 和 C2 等概率接收,但当C1 没回复确认之前,不会再给C1
producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class ProducerTest{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.oneptwoc";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!.";
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"X").getBytes());
System.out.println(" 发送端发送消息: '" + message + "'");
System.out.println("休息10s");
try{
Thread.sleep(10000);
}catch (InterruptedException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("循环发布10条消息");
for(int i=0;i<10;i++){
channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes("UTF-8"));
System.out.println("生产者生产消息成功!! +'" + (message+i) + "'");
}
channel.close();
connection.close();
}
}
consumer1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.oneptwoc";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" -------------------- 接受端1 ----------- ");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static void doWork(String task) throws InterruptedException{
for(char ch : task.toCharArray()){
if(ch == '.')
Thread.sleep(1000);
}
}
}
consumer2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.oneptwoc";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" -------------------- 接受端2 ----------- ");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static void doWork(String task) throws InterruptedException{
for(char ch : task.toCharArray()){
if(ch == '.')
Thread.sleep(1000);
}
}
}
发布、订阅模式 (Publish/Subscribe)
发送端发送广播消息,所有接收端都能接收消息
producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
/**
* 发送消息到一个名为“rabbitmq.jc.exchange.fanout”的exchange上,使用“fanout”方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收
*/
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//不设置消息队列,直接把要传递的消息发送给交换机,以广播方式:fanout 发送
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
System.out.println("休息10s");
try{
Thread.sleep(10000);
}catch (InterruptedException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("循环发布10条消息");
for(int i=0;i<10;i++){
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());
System.out.println("生产者生产消息成功!! +'" + (message+i) + "'");
}
channel.close();
connection.close();
}
}
consumer1
package com.jc.rabbitmq.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.fanout";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.fanout”的exchange,方式为"fanout",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略!
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" -------------------- 接受端1 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
}
}
}
consumer2
package com.jc.rabbitmq.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.fanout";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.fanout”的exchange,方式为"fanout",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略!
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" -------------------- 接受端2 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
}
}
}
按线路发送接收(Routing)
发送端按routing key发送消息,不同的接收端按不同的routing key接收消息
producer
package com.jc.rabbitmq.exchange.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.direct";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//不设置消息队列,直接把要传递的消息发送给交换机,交换机类型是 direct,表明按路线 routing key 发送接收
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
for(int i = 0 ; i < 10;i++){
//设置两条线路 direct1 和 direct2
String severity = "direct" + getRoutingKey();
//要发送的消息
String message = "hello world!";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
private static String getRoutingKey(){
int ram = (int) (Math.random()*10);
if(ram%2 == 0){
return "1";
}else{
return "2";
}
}
}
consumer1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.direct";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.direct”的exchange,方式为"direct",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//绑定随机队列和声明的交换机,设置routing key 为direct1,表名这个消费者只接收 direct1 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "direct1");
System.out.println(" -------------------- 接受端1 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
consumer2
package com.jc.rabbitmq.exchange.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.direct";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.direct”的exchange,方式为"direct",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//绑定随机队列和声明的交换机,设置routing key 为direct2,表名这个消费者只接收 direct2 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "direct2");
System.out.println(" -------------------- 接受端2 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
按Topic发送接收(Topics)
发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
producer
package com.jc.rabbitmq.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.topic";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "Hello World!";
for(int i = 0; i < 10 ;i ++){
String routingKey = "topic" + getRoutingKey();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
String routingKey = "1";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
routingKey = "topic.test.1";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
routingKey = "2";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
routingKey = "topic.2";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
channel.close();
connection.close();
}
private static String getRoutingKey(){
int ram = (int) (Math.random()*10);
if(ram%2 == 0){
return ".1";
}else{
return ".test.2";
}
}
}
consumer1
package com.jc.rabbitmq.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.topic";
/**
* Topic exchange
Topic exchange is powerful and can behave like other exchanges.
When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange.
When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one.
*/
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.topic”的exchange,方式为"topic",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
/**
* *表示匹配一个单词
* #表示匹配0或多个单词
*
* 注意这里是单词
*
* Messages sent to a topic exchange can't have an arbitrary(任意的) routing_key
* - it must be a list of words, delimited by dots.
* The words can be anything, but usually they specify some features connected to the message.
* A few valid routing key examples: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".
* There can be as many words in the routing key as you like, up to the limit of 255 bytes.
*/
//绑定随机队列和声明的交换机,设置 routingKey 为 #1,表名这个消费者接收所有 以.1为结尾的,仅有两个单词 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "*.1");
System.out.println(" -------------------- 接受端1 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
consumer2
package com.jc.rabbitmq.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.topic";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.topic”的exchange,方式为"topic",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//绑定随机队列和声明的交换机,设置 routingKey 为 #2,表名这个消费者接收所有 以.2为结尾的,有1个或多个单词 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "#.2");
System.out.println(" -------------------- 接受端2 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
RPC方式
rpc的全称叫:远程过程调用,可以通俗的理解为通过网络调用另一台电脑上的函数的业务处理思想
工作流程:
客户端创建message时指定reply_to队列名、correlation_id标记调用者
通过队列,服务端收到消息。调用函数处理,然后返回
返回的队列是reply_to指定的队列,并携带correlation_id
返回消息到达客户端,客户端根据correlation_id判断是哪一个函数的调用返回
RPCClient1
package com.jc.rabbitmq.queue.rpc;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class RPCClient1 implements AutoCloseable {
private static final String QUEUE_NAME = "rabbitmq.jc.queue.rpc";
private Connection connection;
private Channel channel;
private String replyQueueName;
private QueueingConsumer consumer;
public static void main(String[] argv) {
RPCClient1 fibonacciRpc = null;
Map<String,Object> answerMap = new HashMap<String,Object>();
try {
fibonacciRpc = new RPCClient1();
Thread.sleep(5000);
System.out.println(" [x] Requesting fib(30)");
answerMap.put(fibonacciRpc.call("3"),null);
answerMap.put(fibonacciRpc.call("4"),null);
answerMap.put(fibonacciRpc.call("5"),null);
answerMap.put(fibonacciRpc.call("6"),null);
answerMap.put(fibonacciRpc.call("7"),null);
answerMap.put(fibonacciRpc.call("8"),null);
//获得计算结果
fibonacciRpc.answer(answerMap);
System.out.println(" [.] Got answer --------- ");
System.out.println(answerMap);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
public RPCClient1() throws IOException, TimeoutException {
//建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//定义一个临时变量的接受队列名
replyQueueName = channel.queueDeclare().getQueue();
//获取server返回的消息
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
//发送RPC请求
public String call(String message) throws IOException, InterruptedException {
//生成一个唯一的字符串作为回调队列的编号
String corrId = UUID.randomUUID().toString();
System.out.println(corrId);
//发送请求消息,消息使用了两个属性:replyto和correlationId
//服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//发布一个消息,requestQueueName路由规则
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
return corrId;
}
public void answer(Map<String,Object> answerMap) throws Exception{
Delivery delivery = null;
//等待接收结果
for(int i = 0 ; i < answerMap.keySet().size(); i++){
delivery = consumer.nextDelivery();
//检查它的correlationId是否是我们所要找的那个
if (answerMap.keySet().contains(delivery.getProperties().getCorrelationId())) {
answerMap.put(delivery.getProperties().getCorrelationId(), new String(delivery.getBody(),"UTF-8"));
}
}
}
public void close() throws IOException {
connection.close();
}
}
RPCClient2
package com.jc.rabbitmq.queue.rpc;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient2{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.rpc";
private Connection connection;
private Channel channel;
private String replyQueueName;
private QueueingConsumer consumer;
public static void main(String[] argv) {
RPCClient1 fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient1();
Thread.sleep(10000);
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("1");
System.out.println(" [.] Got '" + response + "'");
response = fibonacciRpc.call("2");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
public RPCClient2() throws IOException, TimeoutException {
//建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//定义一个临时变量的接受队列名
replyQueueName = channel.queueDeclare().getQueue();
//获取server返回的消息
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
//发送RPC请求
public String call(String message) throws IOException, InterruptedException {
//生成一个唯一的字符串作为回调队列的编号
String corrId = UUID.randomUUID().toString();
System.out.println(corrId);
//发送请求消息,消息使用了两个属性:replyto和correlationId
//服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//发布一个消息,requestQueueName路由规则
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
String response = "";
//等待接收结果
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//检查它的correlationId是否是我们所要找的那个
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(),"UTF-8");
break;
}
}
return response;
}
public void close() throws IOException {
connection.close();
}
}
RPCServer
package com.jc.rabbitmq.queue.rpc;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCServer{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.rpc";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queuePurge(QUEUE_NAME);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//打开应答机制autoAck=false
channel.basicConsume(QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
String response = fib(n) + "";
System.out.println(" [.] fib(" + message + ") = " + response);
//返回处理结果队列
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes());
//发送应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null){
try {
connection.close();
channel.close();
} catch (IOException _ignore) {
}
}
}
}
}
参考链接:
https://www.cnblogs.com/LipeiNet/category/896408.html
https://www.cnblogs.com/lonelyxmas/p/10934514.html
https://blog.csdn.net/dreamhai/article/details/81774998
https://blog.csdn.net/qq_14901335/article/details/80451052
https://blog.csdn.net/sky_jiangcheng/article/details/82025154
https://blog.csdn.net/a236209186/article/details/78932483
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://www.cnblogs.com/goldsunshine/p/8665456.html
以上是关于RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
带着新人学springboot的应用07(springboot+RabbitMQ 下)