rabbitmq结构
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq结构相关的知识,希望对你有一定的参考价值。
参考技术A 生产者创建消息,然后发布到 RabbitMQ 。消息一般可以包含两个部分:消息体和标签消息体也可以称之为 payload ,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者( Consumer )。
消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费这条消息时,只是消费消息的消息体( payload )。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
对于 RabbitMQ 来说, RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin ,即轮询)
给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理,RabbitMQ 不支持队列层面的广播消费。
真实情况是,生产者将消息发送到 Exchange (交换器,通常也
可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或
许会返回给生产者,或许直接丢弃。
生产者将消息发给交换器的时候, 一般会指定一个RoutingKey ,用来指定这个消息的路由规则,而这个 Routing Key 需要与交换器类型和绑定键( BindingKey )联合使用才能最终生效。
RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(Binding Key)。
生产者将消息发送给交换器时, 需要一个RoutingKey,BindingKey和RoutingKey 相匹配的时候,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。
无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端可以紧接着创建一个AMQP信道(Channel)。
信道是建立在 Connection 之上的虚拟连接,
我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?
RabbitMQ
目录
RabbitMQ
是什么
概念
? RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。
? RabbitMQ作为一个消息代理,主要负责接收、存储和转发消息,它提供了可靠的消息机制和灵活的消息路由,并支持消息集群和分布式部署,常用于应用解耦,耗时任务队列,流量削锋等场景。
基本结构
producer : 生产者,RabbitMQ的客户端,负责发送消息。
consumer :消费者,RabbitMQ的客户端,消费消息。
Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个 Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。
Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。
Virtual Host:一个虚拟概念,一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。
Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic三种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。
BindingKey :Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。
? Binding Key由Consumer在Binding Exchange与Message Queue时指定
*Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定*
Queue(Message Queue):消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。
Message :就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。
做什么
异步处理
Java使用消息队列还是直接使用线程池ExecutorService异步处理
应用解耦
流量削锋
怎么用
单发送单接收
Producer
package com.jc.rabbitmq.queue.oneponec;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
public final static String QUEUE_NAME="rabbitmq.jc.queue.oneponec";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
//队列名称、是否持久化(true表示是,队列将在服务器重启时生存)、是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、当所有消费者客户端连接断开时是否自动删除队列、队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//发送消息到队列中
String message = "Hello RabbitMQ2 ";
//basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "'");
System.out.println("生产者生产消息成功!!");
//关闭通道和连接
channel.close();
connection.close();
}
}
consumer
package com.jc.rabbitmq.queue.oneponec;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class CustomerTest{
private final static String QUEUE_NAME = "rabbitmq.jc.queue.oneponec";
public static void main(String[] args) throws IOException, TimeoutException {
//设置RabbitMQ地址
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
//队列名称、是否持久化(true表示是,队列将在服务器重启时生存)、是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、当所有消费者客户端连接断开时是否自动删除队列、队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
QueueingConsumer consumer = new QueueingConsumer(channel);
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println("消息的接收端已经开启了,可以接受消息了!!");
while(true){
QueueingConsumer.Delivery delivery;
try{
delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("队列中的消息是:" + msg);
}catch (ShutdownSignalException e){
// TODO Auto-generated catch block
e.printStackTrace();
}catch (ConsumerCancelledException e){
// TODO Auto-generated catch block
e.printStackTrace();
}catch (InterruptedException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
/*Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);*/
}
}
单发送多接收
P发送到队列上多条消息,分别被C1 和 C2 等概率接收,但当C1 没回复确认之前,不会再给C1
producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class ProducerTest{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.oneptwoc";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!.";
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"X").getBytes());
System.out.println(" 发送端发送消息: '" + message + "'");
System.out.println("休息10s");
try{
Thread.sleep(10000);
}catch (InterruptedException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("循环发布10条消息");
for(int i=0;i<10;i++){
channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes("UTF-8"));
System.out.println("生产者生产消息成功!! +'" + (message+i) + "'");
}
channel.close();
connection.close();
}
}
consumer1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.oneptwoc";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" -------------------- 接受端1 ----------- ");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static void doWork(String task) throws InterruptedException{
for(char ch : task.toCharArray()){
if(ch == '.')
Thread.sleep(1000);
}
}
}
consumer2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.oneptwoc";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" -------------------- 接受端2 ----------- ");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static void doWork(String task) throws InterruptedException{
for(char ch : task.toCharArray()){
if(ch == '.')
Thread.sleep(1000);
}
}
}
发布、订阅模式 (Publish/Subscribe)
发送端发送广播消息,所有接收端都能接收消息
producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
/**
* 发送消息到一个名为“rabbitmq.jc.exchange.fanout”的exchange上,使用“fanout”方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收
*/
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//不设置消息队列,直接把要传递的消息发送给交换机,以广播方式:fanout 发送
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
System.out.println("休息10s");
try{
Thread.sleep(10000);
}catch (InterruptedException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("循环发布10条消息");
for(int i=0;i<10;i++){
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());
System.out.println("生产者生产消息成功!! +'" + (message+i) + "'");
}
channel.close();
connection.close();
}
}
consumer1
package com.jc.rabbitmq.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.fanout";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.fanout”的exchange,方式为"fanout",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略!
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" -------------------- 接受端1 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
}
}
}
consumer2
package com.jc.rabbitmq.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.fanout";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.fanout”的exchange,方式为"fanout",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略!
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" -------------------- 接受端2 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '"+ message +"' [x] Done at " + System.currentTimeMillis());
}
}
}
按线路发送接收(Routing)
发送端按routing key发送消息,不同的接收端按不同的routing key接收消息
producer
package com.jc.rabbitmq.exchange.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.direct";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//不设置消息队列,直接把要传递的消息发送给交换机,交换机类型是 direct,表明按路线 routing key 发送接收
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
for(int i = 0 ; i < 10;i++){
//设置两条线路 direct1 和 direct2
String severity = "direct" + getRoutingKey();
//要发送的消息
String message = "hello world!";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
private static String getRoutingKey(){
int ram = (int) (Math.random()*10);
if(ram%2 == 0){
return "1";
}else{
return "2";
}
}
}
consumer1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.direct";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.direct”的exchange,方式为"direct",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//绑定随机队列和声明的交换机,设置routing key 为direct1,表名这个消费者只接收 direct1 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "direct1");
System.out.println(" -------------------- 接受端1 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
consumer2
package com.jc.rabbitmq.exchange.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.direct";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.direct”的exchange,方式为"direct",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//绑定随机队列和声明的交换机,设置routing key 为direct2,表名这个消费者只接收 direct2 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "direct2");
System.out.println(" -------------------- 接受端2 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
按Topic发送接收(Topics)
发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
producer
package com.jc.rabbitmq.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTest{
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.topic";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "Hello World!";
for(int i = 0; i < 10 ;i ++){
String routingKey = "topic" + getRoutingKey();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
String routingKey = "1";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
routingKey = "topic.test.1";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
routingKey = "2";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
routingKey = "topic.2";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
channel.close();
connection.close();
}
private static String getRoutingKey(){
int ram = (int) (Math.random()*10);
if(ram%2 == 0){
return ".1";
}else{
return ".test.2";
}
}
}
consumer1
package com.jc.rabbitmq.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest1{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.topic";
/**
* Topic exchange
Topic exchange is powerful and can behave like other exchanges.
When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange.
When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one.
*/
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.topic”的exchange,方式为"topic",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
/**
* *表示匹配一个单词
* #表示匹配0或多个单词
*
* 注意这里是单词
*
* Messages sent to a topic exchange can't have an arbitrary(任意的) routing_key
* - it must be a list of words, delimited by dots.
* The words can be anything, but usually they specify some features connected to the message.
* A few valid routing key examples: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".
* There can be as many words in the routing key as you like, up to the limit of 255 bytes.
*/
//绑定随机队列和声明的交换机,设置 routingKey 为 #1,表名这个消费者接收所有 以.1为结尾的,仅有两个单词 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "*.1");
System.out.println(" -------------------- 接受端1 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
consumer2
package com.jc.rabbitmq.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class CustomerTest2{
//Exchange 消息交换机
private static final String EXCHANGE_NAME = "rabbitmq.jc.exchange.topic";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明名为“rabbitmq.jc.exchange.topic”的exchange,方式为"topic",同发送端
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息
String queueName = channel.queueDeclare().getQueue();
//绑定随机队列和声明的交换机,设置 routingKey 为 #2,表名这个消费者接收所有 以.2为结尾的,有1个或多个单词 这条线路上的消息
channel.queueBind(queueName, EXCHANGE_NAME, "#.2");
System.out.println(" -------------------- 接受端2 ----------- ");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
RPC方式
rpc的全称叫:远程过程调用,可以通俗的理解为通过网络调用另一台电脑上的函数的业务处理思想
工作流程:
客户端创建message时指定reply_to队列名、correlation_id标记调用者
通过队列,服务端收到消息。调用函数处理,然后返回
返回的队列是reply_to指定的队列,并携带correlation_id
返回消息到达客户端,客户端根据correlation_id判断是哪一个函数的调用返回
RPCClient1
package com.jc.rabbitmq.queue.rpc;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class RPCClient1 implements AutoCloseable {
private static final String QUEUE_NAME = "rabbitmq.jc.queue.rpc";
private Connection connection;
private Channel channel;
private String replyQueueName;
private QueueingConsumer consumer;
public static void main(String[] argv) {
RPCClient1 fibonacciRpc = null;
Map<String,Object> answerMap = new HashMap<String,Object>();
try {
fibonacciRpc = new RPCClient1();
Thread.sleep(5000);
System.out.println(" [x] Requesting fib(30)");
answerMap.put(fibonacciRpc.call("3"),null);
answerMap.put(fibonacciRpc.call("4"),null);
answerMap.put(fibonacciRpc.call("5"),null);
answerMap.put(fibonacciRpc.call("6"),null);
answerMap.put(fibonacciRpc.call("7"),null);
answerMap.put(fibonacciRpc.call("8"),null);
//获得计算结果
fibonacciRpc.answer(answerMap);
System.out.println(" [.] Got answer --------- ");
System.out.println(answerMap);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
public RPCClient1() throws IOException, TimeoutException {
//建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//定义一个临时变量的接受队列名
replyQueueName = channel.queueDeclare().getQueue();
//获取server返回的消息
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
//发送RPC请求
public String call(String message) throws IOException, InterruptedException {
//生成一个唯一的字符串作为回调队列的编号
String corrId = UUID.randomUUID().toString();
System.out.println(corrId);
//发送请求消息,消息使用了两个属性:replyto和correlationId
//服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//发布一个消息,requestQueueName路由规则
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
return corrId;
}
public void answer(Map<String,Object> answerMap) throws Exception{
Delivery delivery = null;
//等待接收结果
for(int i = 0 ; i < answerMap.keySet().size(); i++){
delivery = consumer.nextDelivery();
//检查它的correlationId是否是我们所要找的那个
if (answerMap.keySet().contains(delivery.getProperties().getCorrelationId())) {
answerMap.put(delivery.getProperties().getCorrelationId(), new String(delivery.getBody(),"UTF-8"));
}
}
}
public void close() throws IOException {
connection.close();
}
}
RPCClient2
package com.jc.rabbitmq.queue.rpc;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient2{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.rpc";
private Connection connection;
private Channel channel;
private String replyQueueName;
private QueueingConsumer consumer;
public static void main(String[] argv) {
RPCClient1 fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient1();
Thread.sleep(10000);
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("1");
System.out.println(" [.] Got '" + response + "'");
response = fibonacciRpc.call("2");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
public RPCClient2() throws IOException, TimeoutException {
//建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//定义一个临时变量的接受队列名
replyQueueName = channel.queueDeclare().getQueue();
//获取server返回的消息
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
//发送RPC请求
public String call(String message) throws IOException, InterruptedException {
//生成一个唯一的字符串作为回调队列的编号
String corrId = UUID.randomUUID().toString();
System.out.println(corrId);
//发送请求消息,消息使用了两个属性:replyto和correlationId
//服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//发布一个消息,requestQueueName路由规则
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
String response = "";
//等待接收结果
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//检查它的correlationId是否是我们所要找的那个
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(),"UTF-8");
break;
}
}
return response;
}
public void close() throws IOException {
connection.close();
}
}
RPCServer
package com.jc.rabbitmq.queue.rpc;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCServer{
private static final String QUEUE_NAME = "rabbitmq.jc.queue.rpc";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queuePurge(QUEUE_NAME);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//打开应答机制autoAck=false
channel.basicConsume(QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
String response = fib(n) + "";
System.out.println(" [.] fib(" + message + ") = " + response);
//返回处理结果队列
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes());
//发送应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null){
try {
connection.close();
channel.close();
} catch (IOException _ignore) {
}
}
}
}
}
参考链接:
https://www.cnblogs.com/LipeiNet/category/896408.html
https://www.cnblogs.com/lonelyxmas/p/10934514.html
https://blog.csdn.net/dreamhai/article/details/81774998
https://blog.csdn.net/qq_14901335/article/details/80451052
https://blog.csdn.net/sky_jiangcheng/article/details/82025154
https://blog.csdn.net/a236209186/article/details/78932483
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://www.cnblogs.com/goldsunshine/p/8665456.html
以上是关于rabbitmq结构的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列[MQ 篇] - 详述 RabbitMQ 五种模型的结构及具体实现