RabbitMQ--发布订阅模式
Posted Gendan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ--发布订阅模式相关的知识,希望对你有一定的参考价值。
消费者1:
Consumer_PubSub1.java
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @version v1.0
- @Date: 2021/6/11 22:58
- @Author: Mr.Throne
- @Description: 消费者
*/
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
String queueName1 = "test_fanout_queue1";
String queueName2 = "test_fanout_queue2";
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设定参数 ip地址
factory.setHost("121.196.161.240");
factory.setPort(5672);
factory.setVirtualHost("/admin");
factory.setUsername("admin");
factory.setPassword("admin");
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
// 参数String queue名称 没有自动创建
// boolean durable 是否持久化
// boolean exclusive 是否独占 连接关闭后是否删除队列
// boolean autoDelete 没有Consumer时 自动删除
// Map<String, Object> arguments 参数信息
channel.queueDeclare("work_queues",true,false,false,null);
//6.接收消息
// String queue 队列名称
// boolean autoAck 是否自动确认
// Consumer callback 回调对象
Consumer consumer = new DefaultConsumer(channel){
//收到消息后的回调方法
//consumerTag 消息标识
//envelope 获取交换机 路由等信息
//body 数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume("test_fanout_queue1",true,consumer);
}
}
消费者2:
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @version v1.0
- @Date: 2021/6/11 22:58
- @Author: Mr.Throne
- @Description: 消费者
*/
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
String queueName1 = "test_fanout_queue1";
String queueName2 = "test_fanout_queue2";
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设定参数 ip地址
factory.setHost("121.196.161.240");
factory.setPort(5672);
factory.setVirtualHost("/admin");
factory.setUsername("admin");
factory.setPassword("admin");
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
// 参数String queue名称 没有自动创建
// boolean durable 是否持久化
// boolean exclusive 是否独占 连接关闭后是否删除队列
// boolean autoDelete 没有Consumer时 自动删除
// Map<String, Object> arguments [能源期货](https://www.gendan5.com/cf/ef.html)参数信息
channel.queueDeclare("work_queues",true,false,false,null);
//6.接收消息
// String queue 队列名称
// boolean autoAck 是否自动确认
// Consumer callback 回调对象
Consumer consumer = new DefaultConsumer(channel){
//收到消息后的回调方法
//consumerTag 消息标识
//envelope 获取交换机 路由等信息
//body 数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息保存到数据库");
}
};
channel.basicConsume("test_fanout_queue2",true,consumer);
}
}
以上是关于RabbitMQ--发布订阅模式的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)
RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式