activeMQ队列模式和主题模式的Java实现

Posted aston

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activeMQ队列模式和主题模式的Java实现相关的知识,希望对你有一定的参考价值。

一、队列模式

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;



public class AppProducer {
    public static final String url = "tcp://127.0.0.1:61616";
    public static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException{
        //1. 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
        
        //2. 创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3. 启动链接
        connection.start();
        
        //4. 创建会话
        Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
        
        //5. 创建一个目标
        Destination destination = session.createQueue( queueName);
        
        //6. 创建一个生产者
        MessageProducer producer = session.createProducer( destination);
        
        for( int i=0; i<100; i++){
            //7. 创建消息
            TextMessage textMessage = session.createTextMessage( "test" + i);
            //8. 发布消息
            producer.send( textMessage);
            
            System.out.println( "发送消息" + textMessage.getText());
        }
        
        //9. 关闭链接
        connection.close();
    }

}

 

消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AppConsumer {
    public static final String url = "tcp://127.0.0.1:61616";
    public static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException{
        //1. 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
        
        //2. 创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3. 启动链接
        connection.start();
        
        //4. 创建会话
        Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
        
        //5. 创建一个目标
        Destination destination = session.createQueue( queueName);
        
        //6. 创建一个消费者
        MessageConsumer consumer = session.createConsumer( destination);
        
        //7. 创建一个监听器
        consumer.setMessageListener( new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = ( TextMessage) message;
                try{
                    System.out.println( "0接收消息" + textMessage.getText());
                }catch( JMSException e){
                    e.printStackTrace();
                }
                
            }
        });
        
        //8. 关闭链接
        //connection.close();
    }
}

 

二、主题模式

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;



public class AppProducer {
    public static final String url = "tcp://127.0.0.1:61616";
    public static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException{
        //1. 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
        
        //2. 创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3. 启动链接
        connection.start();
        
        //4. 创建会话
        Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
        
        //5. 创建一个目标
        Destination destination = session.createTopic( topicName);
        
        //6. 创建一个生产者
        MessageProducer producer = session.createProducer( destination);
        
        for( int i=0; i<100; i++){
            //7. 创建消息
            TextMessage textMessage = session.createTextMessage( "test" + i);
            //8. 发布消息
            producer.send( textMessage);
            
            System.out.println( "发送消息" + textMessage.getText());
        }
        
        //9. 关闭链接
        connection.close();
    }

}

消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AppConsumer {
    public static final String url = "tcp://127.0.0.1:61616";
    public static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException{
        //1. 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
        
        //2. 创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3. 启动链接
        connection.start();
        
        //4. 创建会话
        Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
        
        //5. 创建一个目标
        Destination destination = session.createTopic( topicName);
        
        //6. 创建一个消费者
        MessageConsumer consumer = session.createConsumer( destination);
        
        //7. 创建一个监听器
        consumer.setMessageListener( new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = ( TextMessage) message;
                try{
                    System.out.println( "0接收消息" + textMessage.getText());
                }catch( JMSException e){
                    e.printStackTrace();
                }
                
            }
        });
        
        //8. 关闭链接
        //connection.close();
    }
}

 

以上是关于activeMQ队列模式和主题模式的Java实现的主要内容,如果未能解决你的问题,请参考以下文章

activemq消息队列和kafka有啥区别

ActiveMQ--模式(队列模式/主题模式)

ActiveMQ队列主题模式区别

ActiveMQ的两种消息模式,主题队列

JMS消息队列ActiveMQ(发布/订阅模式)

消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)