activemq的使用

Posted lin-nest

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activemq的使用相关的知识,希望对你有一定的参考价值。

1. activemq的使用(点对点)

  (1) 创建父工程(pom工程),定义activemq的版本号

<activemq.version>5.13.4</activemq.version>

  (2) 创建子工程(war工程),引入activemq依赖

<dependencies>
    <!-- activemq -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
    </dependency>
</dependencies>

  (3) 创建producer(生产者)

package com.demo.activemq.producer.service;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActivemqProducer {

    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到神奇的activemq世界");
        //8.发送消息
        producer.send(textMessage);
        System.out.println("ActivemqProducer发送了消息");
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

  (4) 创建consumer(消费者)

package com.demo.activemq.producer.service;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActivemqConsumer {

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(queue);
        System.out.println("消费者正在运行");
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });    
        //8.等待键盘输入
        System.in.read();    
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

  (5) 运行consumer(消费者)

技术分享图片

  (6) 运行producer(生产者)

技术分享图片

  (7) 消费者运行结果

技术分享图片

2. activemq的使用(订阅)

  (1) 创建生产者

package com.demo.activemq.producer.service;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActivemqTopicProducer {

    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://120.79.92.203:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到神奇的activemqTopic世界");
        //8.发送消息
        producer.send(textMessage);
        System.out.println("ActivemqTopicProducer发送了消息");
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

   (2) 创建topic消费者1

package com.demo.activemq.producer.service;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActivemqTopicConsumer1 {

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://120.79.92.203:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);
        System.out.println("消费者1正在运行");
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });    
        //8.等待键盘输入
        System.in.read();    
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

  (3) 创建topic消费者2

package com.demo.activemq.producer.service;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActivemqTopicConsumer2 {

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://120.79.92.203:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);
        System.out.println("消费者2正在运行");
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });    
        //8.等待键盘输入
        System.in.read();    
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

   (4) 运行,结果略

以上是关于activemq的使用的主要内容,如果未能解决你的问题,请参考以下文章

wildfly 实践5 ---分布式服务中的JMS服务访问

ActiveMQ的安全机制使用及其源代码分析 [转]

ActiveMQ——activemq的使用java代码实例

ActiveMQ入门案例-生产者代码实现

Java之activeMQ的使用

ActiveMQ消息队列的使用及应用