5RabbitMQ-订阅模式 Publish/Subscribe

Posted mrchengs

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了5RabbitMQ-订阅模式 Publish/Subscribe相关的知识,希望对你有一定的参考价值。

http://www.rabbitmq.com/tutorials/tutorial-three-java.html

 

1、模型图

我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多
个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

技术图片

在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。
这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。

 

举列:
类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)

 

解读:
1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

 

 2、代码实践

生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class Send {
     private static final String  EXCHANGE_NAME="test_exchange_fanout";
     
     public static void main(String[] args) throws IOException,  TimeoutException {
           
           Connection conn = ConnectionUtils.getConnection();
           
           Channel channel = conn.createChannel();
           
           //声明交换机
           channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
           
           //发送信息
           String msg = "hello";
           
           channel.basicPublish(EXCHANGE_NAME, "", null,  msg.getBytes());
           
           channel.close();
           conn.close();
     }
}

 

 技术图片

 

 

但是这个发送的消息到哪了呢? 
消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的
能力.因为这时还没有队列,所以就会丢失;
小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!

 

消费者1

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;
public class Receive {
     
     private static final String QUEUE_NAME="test_queue1";
     private static final String  EXCHANGE_NAME="test_exchange_fanout";
     
     public static void main(String[] args) throws IOException,  TimeoutException {
           
           Connection conn = ConnectionUtils.getConnection();
           
           Channel channel = conn.createChannel();
           
           //队列声明
           channel.queueDeclare(QUEUE_NAME, false, false, false,  null);
           
           //绑定队列到交换机转发器
           channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
           
                     //定义一个消费者
                     Consumer consumer = new  DefaultConsumer(channel){
                           //收到消息就会触发这个方法
                           @Override
                           public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                     throws IOException {
                                String msg = new  String(body,"utf-8");
                                System.out.println("消费者1接收到的消息" + msg);
                                
                                try {
                                     Thread.sleep(1500);
                                } catch (InterruptedException e)  {
                                     e.printStackTrace();
                                }finally{
                                     System.out.println("消费者1处理完成!");
                                     //手动回执
                                     channel.basicAck(envelope.getDeliveryTag(), false);
                                }
                                
                           }
                     };
                     //监听队列
                     //自动应答false
                     boolean autoAck = false;
                     channel.basicConsume(QUEUE_NAME, autoAck,  consumer);
     }
}

 

 

消费者2

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;

public class Receive2 {
    
    private static final String QUEUE_NAME="test_queue";
    private static final String EXCHANGE_NAME="test_exchange_fanout";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        Connection conn = ConnectionUtils.getConnection();
        
        Channel channel = conn.createChannel();
        
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        //绑定队列到交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        
                
                //定义一个消费者
                Consumer consumer = new DefaultConsumer(channel){
                    //收到消息就会触发这个方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                            throws IOException {
                        String msg = new String(body,"utf-8");
                        System.out.println("消费者2接收到的消息" + msg);
                        
                        try {
                            Thread.sleep(1500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally{
                            System.out.println("消费者2处理完成!");
                            //手动回执
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                        
                    }
                };
                //监听队列
                //自动应答false
                boolean autoAck = false;
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

 

一个消息 可以被多个消费者

技术图片

 

 技术图片

 

 后台进行查看:

技术图片

 

以上是关于5RabbitMQ-订阅模式 Publish/Subscribe的主要内容,如果未能解决你的问题,请参考以下文章

EventBus发布-订阅模式 ( 使用代码实现发布-订阅模式 )

从发布-订阅模式到消息队列

观察者模式 vs 发布-订阅模式

RedisRedis 发布订阅通信模式 ( 发布订阅模式 | 订阅频道 | 发布消息 | 接收消息 )

设计模式Javascript设计模式——订阅发布模式

EventBus发布-订阅模式 ( Android 中使用 发布-订阅模式 进行通信 )