RabbitMQ--发布订阅模式

Posted Gendan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ--发布订阅模式相关的知识,希望对你有一定的参考价值。

消费者1:
Consumer_PubSub1.java
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**

  • @version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @Description: 消费者
    */

public class Consumer_PubSub1 {

public static void main(String[] args) throws IOException, TimeoutException {
    String queueName1 = "test_fanout_queue1";
    String queueName2 = "test_fanout_queue2";
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设定参数 ip地址
    factory.setHost("121.196.161.240");
    factory.setPort(5672);
    factory.setVirtualHost("/admin");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接 Connection
    Connection connection = factory.newConnection();
    //4.创建Channel
    Channel channel = connection.createChannel();
    //5.创建队列Queue
    // 参数String queue名称 没有自动创建
    // boolean durable 是否持久化
    // boolean exclusive 是否独占 连接关闭后是否删除队列
    // boolean autoDelete 没有Consumer时 自动删除
    // Map<String, Object> arguments 参数信息
    channel.queueDeclare("work_queues",true,false,false,null);
    //6.接收消息
    // String queue 队列名称
    // boolean autoAck 是否自动确认
    // Consumer callback 回调对象
    Consumer consumer = new DefaultConsumer(channel){
        //收到消息后的回调方法
        //consumerTag 消息标识
        //envelope 获取交换机 路由等信息
        //body 数据
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);

            System.out.println("body:"+new String(body));
            System.out.println("将日志信息打印到控制台");
        }
    };
    channel.basicConsume("test_fanout_queue1",true,consumer);
}

}
消费者2:
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**

  • @version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @Description: 消费者
    */

public class Consumer_PubSub2 {

public static void main(String[] args) throws IOException, TimeoutException {
    String queueName1 = "test_fanout_queue1";
    String queueName2 = "test_fanout_queue2";
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设定参数 ip地址
    factory.setHost("121.196.161.240");
    factory.setPort(5672);
    factory.setVirtualHost("/admin");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接 Connection
    Connection connection = factory.newConnection();
    //4.创建Channel
    Channel channel = connection.createChannel();
    //5.创建队列Queue
    // 参数String queue名称 没有自动创建
    // boolean durable 是否持久化
    // boolean exclusive 是否独占 连接关闭后是否删除队列
    // boolean autoDelete 没有Consumer时 自动删除
    // Map<String, Object> arguments [能源期货](https://www.gendan5.com/cf/ef.html)参数信息
    channel.queueDeclare("work_queues",true,false,false,null);
    //6.接收消息
    // String queue 队列名称
    // boolean autoAck 是否自动确认
    // Consumer callback 回调对象
    Consumer consumer = new DefaultConsumer(channel){
        //收到消息后的回调方法
        //consumerTag 消息标识
        //envelope 获取交换机 路由等信息
        //body 数据
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);

            System.out.println("body:"+new String(body));
            System.out.println("将日志信息保存到数据库");
        }
    };
    channel.basicConsume("test_fanout_queue2",true,consumer);

}

}

以上是关于RabbitMQ--发布订阅模式的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)

NetCore RabbitMQ 发布订阅模式,消息广播

RabbitMQ--发布订阅模式

RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式

RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式

RabbitMQ入门:发布/订阅(Publish/Subscribe)