ActiveMQ——activemq的使用java代码实例

Posted 低调人生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ——activemq的使用java代码实例相关的知识,希望对你有一定的参考价值。

ActiveMQ 在java中的使用,通过单例模式、工厂实现

一、导jar包

<dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jul-to-slf4j</artifactId>
            <version>1.6.1</version>
        </dependency>

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.1.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-messaging</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-context</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-beans</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-aop</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-tx</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

 

二、java代码

1、连接工厂 配置

package com.broadsense.iov.base.jms;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
/**
 * 连接工厂 配置
 * 
 * @author flm
 * 2017年10月13日
 */
public class ConnectionFactory
{
  private static final String URL = "tcp://113.106.93.254:61616";
  private static final String USERNAME = "hkadmin";
  private static final String PASSWORD = "hk667";
  private static final int SESSIONCACHESIZE = 20;
  private javax.jms.ConnectionFactory factory;

  public static synchronized javax.jms.ConnectionFactory getInstance()
  {
    if (SingletonHolder.INSTANCE.factory == null) {
      SingletonHolder.INSTANCE.build();
    }
    return SingletonHolder.INSTANCE.factory;
  }

  private void build()
  {
    AMQConfigBean bean = loadConfigure();
    this.factory = buildConnectionFactory(bean);
  }

  private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) {
    javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL());

    CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory();
    connectoryFacotry.setTargetConnectionFactory(targetFactory);
    connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize());

    return connectoryFacotry;
  }

  private AMQConfigBean loadConfigure() {
    if ("tcp://113.106.93.254:61616" != null) {
      try {
        return new AMQConfigBean("tcp://100.100.10.100:61616", "hkadmin", "hk667", 20);
      } catch (Exception e) {
        throw new IllegalStateException("load amq config error!");
      }
    }
    throw new IllegalStateException("load amq config error!");
  }

  private static class AMQConfigBean
  {
    private String brokerURL;
    private String userName;
    private String password;
    private int sessionCacheSize;

    public AMQConfigBean() {
    }

    public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) {
      this.brokerURL = brokerURL;
      this.userName = userName;
      this.password = password;
      this.sessionCacheSize = sessionCacheSize;
    }

    public String getBrokerURL() {
      return this.brokerURL;
    }

    public void setBrokerURL(String brokerURL) {
      this.brokerURL = brokerURL;
    }

    public String getUserName() {
      return this.userName;
    }

    public void setUserName(String userName) {
      this.userName = userName;
    }

    public String getPassword() {
      return this.password;
    }

    public void setPassword(String password) {
      this.password = password;
    }

    public int getSessionCacheSize() {
      return this.sessionCacheSize;
    }

    public void setSessionCacheSize(int sessionCacheSize) {
      this.sessionCacheSize = sessionCacheSize;
    }
  }

  private static class SingletonHolder
  {
    static ConnectionFactory INSTANCE = new ConnectionFactory(null);
  }
}

 

2、模版

package com.broadsense.iov.base.jms;

import org.springframework.jms.core.JmsTemplate;

/**
* 模板厂
*
* @author flm
* 2017年10月13日
*/

public class JmsTemplateFactory
{
  private final javax.jms.ConnectionFactory factory;
  private JmsTemplate topicJmsTemplate;
  private JmsTemplate queueJmsTemplate;

  public static JmsTemplateFactory getInstance()
  {
    return SingletonHolder.INSTANCE;
  }

  private JmsTemplateFactory()
  {
    this.factory = ConnectionFactory.getInstance();
  }

  public synchronized JmsTemplate getTopicJmsTemplate() {
    if (this.topicJmsTemplate == null) {
      this.topicJmsTemplate = createTemplate(this.factory, true);
    }
    return this.topicJmsTemplate;
  }

  public synchronized JmsTemplate getQueueJmsTemplate() {
    if (this.queueJmsTemplate == null) {
      this.queueJmsTemplate = createTemplate(this.factory, false);
    }
    return this.queueJmsTemplate;
  }

  private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) {
    JmsTemplate template = new JmsTemplate(factory);
    template.setPubSubDomain(pubSubDomain);
    return template;
  }

  public static class SingletonHolder
  {
    static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null);
  }
}

 

3、消费者 模版

package com.broadsense.iov.base.jms;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
/**
 * JMS监听器  创建消费者
 * 
 * @author flm
 * 2017年10月13日
 */
public class JMSListener
{
  private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class);
  private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();

  public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)
  {
    startJmsQueueListener(queueName, null, listener);
  }
  public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) {
    Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName);
    if (dst == null) {
      ActiveMQQueue mq = new ActiveMQQueue(queueName);
      startJmsListener(mq, subName, listener);
      MQDESTS.put("QUEUE_" + queueName, mq);
    } else {
      LOGGER.warn(queueName + " already started");
    }
  }

  public static synchronized void startJmsTopicListener(String topicName, MessageListener listener)
  {
    startJmsTopicListener(topicName, null, listener);
  }

  public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) {
    ActiveMQTopic mq = new ActiveMQTopic(topicName);
    startJmsListener(mq, subName, listener);
    MQDESTS.put("QUEUE_" + topicName, mq);
  }

  private static void startJmsListener(Destination dest, String subName, MessageListener msgListener)
  {
    javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance();

    SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();
    listener.setConnectionFactory(factory);
    listener.setDestination(dest);
    listener.setMessageListener(msgListener);
    if ((subName != null) && (subName != "")) {
      listener.setDurableSubscriptionName(subName);
    }
    listener.start();
  }
}

 

4、生产者 模版

package com.broadsense.iov.base.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
 * 创建 jms生产者
 * 
 * @author flm
 * 2017年10月13日
 */
public class JMSPublisher
{
  public static void sendTopicMessage(String dest, String msg)
  {
    JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg)
    {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(this.val$msg);
      }
    });
  }

  public static void sendQueueMessage(String dest, String msg)
  {
    JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg)
    {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(this.val$msg);
      }
    });
  }
}

 






以上是关于ActiveMQ——activemq的使用java代码实例的主要内容,如果未能解决你的问题,请参考以下文章

Java之activeMQ的使用

Java ActiveMQ 讲解理解JMS 和 ActiveMQ基本使用(转)

使用Java编写ActiveMQ的队列模式和主题模式

ActiveMQ发送消息

ActiveMQ、STOMP、Java 示例

java 消息机制 ActiveMQ入门实例