(九)RabbitMQ交换机(Exchange)
Posted 小怪吖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(九)RabbitMQ交换机(Exchange)相关的知识,希望对你有一定的参考价值。
交换机Exchange
1、交换机
1.1. Exchanges 概念
RabbitMQ 消息传递模型的核心思想:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。
1.2. Exchanges 的类型
- 直接(direct),
- 主题(topic)
- 标题(headers)
- 扇出(fanout)
1.3. 无名exchange(默认交换机)
也就是推送消息时,交换机的参数是 “ ”,这里用的就是默认的交换机
channel.basicPublish("", queueName, null, msg.getBytes());
2、临时队列
也就是队列名称随机产生,一旦我们断开了消费者的连接,队列将被自动删除,也没有持久化
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
3、绑定(bindings)
概念 :binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队列进行了绑定关系。
4、Fanout(发布/订阅)
将接收到的所有消息广播到它绑定的所有队列中,也就是关于routingKey他不管。系统中油默认 exchange 类型
不管是生产者还是消费者声明的交换机,只要谁先启动就只有一个
生产者
package com.feng.fanoutExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/24 20:26
* @Version 1.0
* @Description fanout模式生产者
*/
public class EmitLog
//交换机名称
public static final String EXCHANGE_NAME = "logs";`在这里插入代码片`
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext())
String msg = scanner.next();
//设置消息持久化
channel.basicPublish(EXCHANGE_NAME,"", null,msg.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+msg);
消费者1
package com.feng.fanoutExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/24 20:08
* @Version 1.0
* @Description 消息消费者1
*/
public class ReceiveLogs01
//交换机名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机,因为是fanout所以不用声明交换机也行
//channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个临时队列
/**
* 1、队列名是随机取得
* 2、连接断开之后队列就删除了
*/
String queueName = channel.queueDeclare().getQueue();//返回队列名
//交换机绑定队列,这里因为是 Fanout模式,所以路由key其实并不重要,可以随便取
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, (String consumerTag, Delivery message) ->
System.out.println("01接收消息是:" + new String(message.getBody(), "UTF-8"));
, consumerTag ->
);
消费者2
package com.feng.fanoutExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/24 20:08
* @Version 1.0
* @Description 消息消费者1
*/
public class ReceiveLogs02
//交换机名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机,因为是fanout所以不用声明交换机也行
//channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个临时队列
/**
* 1、队列名是随机取得
* 2、连接断开之后队列就删除了
*/
String queueName = channel.queueDeclare().getQueue();//返回队列名
//交换机绑定队列,这里因为是 Fanout模式,所以路由key其实并不重要,可以随便取
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, (String consumerTag, Delivery message) ->
System.out.println("02接收消息是:" + new String(message.getBody(), "UTF-8"));
, consumerTag ->
);
5、Direct exchange、
队列只对它绑定的交换机的消息感兴趣。也就是由routingKey决定交换机发送到哪个队列
代码实现
示意图
console队列
package com.feng.directExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/24 20:45
* @Version 1.0
* @Description direct 模式交换机消费者1
*/
public class ReceiveLogsDirect01
//交换机名称
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机,因为是fanout所以不用声明交换机也行
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare("console",false,false,false,null);
//交换机绑定队列,这里因为是 direct,所以路由key得绑定
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
//接收消息
channel.basicConsume("console", true, (String consumerTag, Delivery message) ->
System.out.println("console队列接收消息是:" + new String(message.getBody(), "UTF-8"));
, consumerTag ->
);
disk队列
package com.feng.directExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/24 20:45
* @Version 1.0
* @Description direct 模式交换机消费者2
*/
public class ReceiveLogsDirect02
//交换机名称
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机,因为是fanout所以不用声明交换机也行
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare("disk",false,false,false,null);
//交换机绑定队列,这里因为是 direct,所以路由key得绑定
channel.queueBind("disk", EXCHANGE_NAME, "error");
//接收消息
channel.basicConsume("disk", true, (String consumerTag, Delivery message) ->
System.out.println("disk队列接收消息是:" + new String(message.getBody(), "UTF-8"));
, consumerTag ->
);
发送方
package com.feng.directExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/24 20:58
* @Version 1.0
* @Description direct 模式交换机生产者
*/
public class DirectLogs
//交换机名称
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT );
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext())
String msg = scanner.next();
//这里想给谁发,就指定对应得RountingKey就行
channel.basicPublish(EXCHANGE_NAME,"error", null,msg.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+msg);
6、Topics
topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。比如说:“stock.usd.nyse”, “nyse.vmw”,有两个替换符,
*(星号) 可以代替一个单词
#(井号) 可以替代零个或多个单词
缺点:就是速度慢,需要匹配
特殊情况
- 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
- 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
代码实现
绑定情况
消费者1
package com.feng.topicExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/25 14:03
* @Version 1.0
* @Description Topic交换机消费者01
*/
public class ReceiveLogsTopic01
//交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
//队列名
private static final String QUEUE_NAME = "Q1";
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
//消费消息
channel.basicConsume(QUEUE_NAME,true,(String consumerTag, Delivery message) ->
System.out.println(QUEUE_NAME+"接收消息是:" + new String(message.getBody(), "UTF-8"));
System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
, consumerTag -> );
消费者2
package com.feng.topicExchange;
import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author Feng
* @Date 2022/11/25 14:03
* @Version 1.0
* @Description Topic交换机消费者02
*/
public class ReceiveLogsTopic02
//交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
//队列名
private static final String QUEUE_NAME = "Q2";
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
//消费消息
channel.basicConsume(QUEUE_NAME,<以上是关于(九)RabbitMQ交换机(Exchange)的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ初学之一:exchange与queue的绑定