消息队列中点对点与发布订阅区别

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列中点对点与发布订阅区别相关的知识,希望对你有一定的参考价值。

参考技术A

JMS规范支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。

1. 点对点
生产者生产消息发送到queue中,然后消费者从queue中取出并且消费消息。这里要注意:
消息被消费以后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
2. 发布/订阅
生产者将消息发布到topic中,同时有多个消费者订阅该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

小结
queue实现了负载均衡,一条消息只能被一个消费者接收,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者,一个queue可以有很多消费者,他们之间实现了负载均衡, 所以Queue实现了一个可靠的负载均衡。 topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝

疑问
发布订阅模式下,能否实现订阅者负载均衡消费呢?当发布者消息量很大时,显然单个订阅者的处理能力是不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,
这样订阅者很容易实现消费能力线性扩展。

传统企业型消息队列ActiveMQ遵循了JMS规范,实现了点对点和发布订阅模型,但其他流行的消息队列RabbitMQ、Kafka并没有遵循老态龙钟的JMS规范,是通过什么方式实现消费负载均衡、多订阅呢?

RabbitMQ实现了AQMP协议,AQMP协议定义了消息路由规则和方式。生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。此外RabbitMQ是向消费端推送消息,订阅关系和消费状态保存在服务端。

生产端发送一条消息通过路由投递到Queue,只有一个消费者能消费到。

当RabbitMQ需要支持多订阅时,发布者发送的消息通过路由同时写到多个Queue,不同订阅组消费此消息。
RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。所以支持多订阅时,消息会多个拷贝。

Kafka只支持消息持久化,消费端为拉模型,消费状态和订阅关系由客户端端负责维护,消息消费完后不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。

同一个订阅组会消费topic所有消息,每条消息只会被同一个订阅组的一个消费节点消费,同一个订阅组内不同消费节点会消费不同消息

转自: https://blog.csdn.net/lizhitao/article/details/47723105

ActiveMQ之队列和主题发布订阅实例

JMS 消息模型

  JMS消息服务应用程序结构支持两种模型:点对点模型,发布者/订阅者模型。  

  (1)点对点模型(Queue)

    一个生产者向一个特定的队列发布消息,一个消费者从这个队列中依次读取消息。

    模型特点:只有一个消费者获得消息。

  (2)发布者/订阅者模型(Topic)

    0个或多个订阅者可以接受特定主题的消息。

    模型特点:多个消费者可获得消息。

    Topic和Queue的最大区别在于Topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而Queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

JMS消息格式

  • MapMessage -- key-value键值对

  • TextMessage -- 字符串对象

  • ObjcetMessage -- 一个序列化的Java对象

  • ByteMessage -- 一个未解释字节的数据流

  • StreamMessage -- Java原始值的数据流

 

增加maven依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.10.0</version>
</dependency>

 

点对点

 

生产者

package com.zns.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory factory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        // 消息创建者
        MessageProducer messageProducer;
        try {
            factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建队列(返回一个消息目的地)
            destination = session.createQueue("Queue1");
            // 创建消息生产者
            messageProducer = session.createProducer(destination);
            // 创建TextMessage消息实体
            TextMessage message = session.createTextMessage("hello,world!");
            messageProducer.send(message);
            session.commit();
            System.out.println("生产了消息...");
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

 

消费者

package com.zns.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory connectionFactory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(消费者就不需要开启事务了)
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建队列(返回一个消息目的地)
            destination = session.createQueue("Queue1");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //注册消息监听
            consumer.setMessageListener(new MyListerner());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 

 

消息监听器

package com.zns.activemq.queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyListerner implements MessageListener{
    public void onMessage(Message message) {
        try {
            System.out.println(((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 

 

发布订阅

 

生产者

package com.zns.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory factory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        // 消息创建者
        MessageProducer messageProducer;
        try {
            factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建Topic
            destination = session.createTopic("Topic1");
            // 创建消息生产者
            messageProducer = session.createProducer(destination);
            // 创建TextMessage消息实体
            TextMessage message = session.createTextMessage("hello,world!");
            messageProducer.send(message);
            session.commit();
            System.out.println("生产了消息...");
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

 

消费者

package com.zns.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer1 {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory connectionFactory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(消费者就不需要开启事务了)
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建Topic
            destination = session.createTopic("Topic1");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //注册消息监听
            consumer.setMessageListener(new Listerner1());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package com.zns.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer2 {
    public static void main(String[] args) {
         // 连接工厂
        ConnectionFactory connectionFactory;
        // 连接实例
        Connection connection = null;
        // 收发的线程实例
        Session session;
        // 消息发送目标地址
        Destination destination;
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            // 获取连接实例
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建接收或发送的线程实例(消费者就不需要开启事务了)
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 创建Topic
            destination = session.createTopic("Topic1");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //注册消息监听
            consumer.setMessageListener(new Listerner2());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 

 

消息监听器

package com.zns.activemq.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listerner1 implements MessageListener{
    public void onMessage(Message message) {
        try {
             System.out.println("订阅者1接收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package com.zns.activemq.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listerner2 implements MessageListener{
    public void onMessage(Message message) {
        try {
             System.out.println("订阅者2接收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 

以上是关于消息队列中点对点与发布订阅区别的主要内容,如果未能解决你的问题,请参考以下文章

消息队列中点对点与发布订阅区别(转)

消息队列两种模式:点对点与发布订阅

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

ActiveMQ之队列和主题发布订阅实例

ActiveMQ消息队列的使用及应用

ActiveMQ消息队列的使用及应用