ActiveMQ消息发送与接收

Posted 四季常青

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ消息发送与接收相关的知识,希望对你有一定的参考价值。

推荐文章:ActiveMQ讯息传送机制以及ACK机制

 

ActiveMQ发送消息

  1:创建链接工厂ConnectionFactory

  2:创建链接Connection

  3:启动session

  4:创建消息发送目的地

  5:创建生产者

  6:发送消息

消息发送类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final String USERNAME = "admin";
    
    private static final String PASSWORD = "admin";
    
    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
    
    private AtomicInteger count = new AtomicInteger();
    
    private ConnectionFactory connectionFactory;
    
    private Connection connection;
    
    private Session session;
    
    private Queue queue;
    
    private MessageProducer producer;
    
    public void init() {
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //从工厂中创建一个链接
            connection = connectionFactory.createConnection();
            //启动链接,不启动不影响消息的发送,但影响消息的接收
            connection.start();
            //创建一个事物session
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //获取消息发送的目的地,指消息发往那个地方
            queue = session.createQueue("test");
            //获取消息发送的生产者
            producer = session.createProducer(queue);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public void sendMsg(String queueName) {
        try {
            int num = count.getAndIncrement();
            TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:生产者发送消息!,count:"+num);
            producer.send(msg);
                 
            session.commit();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
}
connection.createSession方法
 /**
     * Creates a <CODE>Session</CODE> object.
     *
     * @param transacted indicates whether the session is transacted
     * @param acknowledgeMode indicates whether the consumer or the client will
     *                acknowledge any messages it receives; ignored if the
     *                session is transacted. Legal values are
     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
     * @return a newly created session
     * @throws JMSException if the <CODE>Connection</CODE> object fails to
     *                 create a session due to some internal error or lack of
     *                 support for the specific transaction and acknowledgement
     *                 mode.
     * @see Session#AUTO_ACKNOWLEDGE
     * @see Session#CLIENT_ACKNOWLEDGE
     * @see Session#DUPS_OK_ACKNOWLEDGE
     * @since 1.1
     */
    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        checkClosedOrFailed();
        ensureConnectionInfoSent();
        if(!transacted) {
            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
            }
        }
        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
    }
createSession方法里有两个参数,第一个参数表示是否使用事务,第二个参数表示消息的确认模式。消息的确认模式共有4种:
1:AUTO_ACKNOWLEDGE 自动确认
2:CLIENT_ACKNOWLEDGE 客户端手动确认 
3:DUPS_OK_ACKNOWLEDGE 自动批量确认
0:SESSION_TRANSACTED 事务提交并确认
4:INDIVIDUAL_ACKNOWLEDGE 单条消息确认 为AcitveMQ自定义的ACK_MODE
各种确认模式详细说明可以看文章:ActiveMQ讯息传送机制以及ACK机制
从createSession方法中可以看出如果如果session不使用事务但是却使用了消息提交(SESSION_TRANSACTED)确认模式,或使用的消息确认模式不存在,将抛出异常。

ActiveMQ接收消息

  1:创建链接工厂ConnectionFactory

  2:创建链接Connection

  3:启动session

  4:创建消息发送目的地

  5:创建生产者

  6:接收消息或设置消息监听器

消息接收类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";
    
    private static final String PASSWORD = "admin";
    
    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
    
    private AtomicInteger count = new AtomicInteger();
    
    private ConnectionFactory connectionFactory;
    
    private ActiveMQConnection connection;
    
    private Session session;
    
    private Queue queue;
    
    private MessageConsumer consumer;
    
    public void init() {
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //从工厂中创建一个链接
            connection = (ActiveMQConnection) connectionFactory.createConnection();
            
            //启动链接
            connection.start();
            //创建一个事物session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //获取消息接收的目的地,指从哪里接收消息
            queue = session.createQueue("test");
            //获取消息接收的消费者
            consumer = session.createConsumer(queue);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public void receiver(String queueName) {
        
        try {
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}
consumer.receive方法
/**
     * Receives the next message produced for this message consumer.
     * <P>
     * This call blocks indefinitely until a message is produced or until this
     * message consumer is closed.
     * <P>
     * If this <CODE>receive</CODE> is done within a transaction, the consumer
     * retains the message until the transaction commits.
     *
     * @return the next message produced for this message consumer, or null if
     *         this message consumer is concurrently closed
     */
    public Message receive() throws JMSException {
        checkClosed(); //检查unconsumedMessages是否关闭 ,消费者从unconsumedMessages对象中获取消息
        checkMessageListener(); //检查是否有其他消费者使用了监听器,同一消息消息队列中不能采用reveice和messageListener并存消费消息

        sendPullCommand(0); //如果prefetchSize为空且unconsumedMessages为空 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备
        MessageDispatch md = dequeue(-1); //从unconsumedMessages取出一个消息 
        if (md == null) {
            return null;
        }

        beforeMessageIsConsumed(md);
        afterMessageIsConsumed(md, false);

        return createActiveMQMessage(md);
    }
prefetchSize属性如果大于0,消费者每次拉去消息时都会预先拉取一定量的消息,拉取的消息数量<=prefetchSize,prefetchSize默认指为1000,这个默认值是从connection中传过来的
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
        checkClosed();

        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createConsumer(this, messageSelector, noLocal);
        }

        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
        int prefetch = 0;
        if (destination instanceof Topic) {
            prefetch = prefetchPolicy.getTopicPrefetch();
        } else {
            prefetch = prefetchPolicy.getQueuePrefetch();
        }
        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
    }

消息接收类代码中调用session.createConsumer其实调用的就是上面的createConsumer方法,从上面代码中可以看出connection会将自己的prefetch传递给消费者,connection中的ActiveMQPrefetchPolicy

对象属性如下:

public class ActiveMQPrefetchPolicy extends Object implements Serializable {
    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
    public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
    public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
    public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;

    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class);

    private int queuePrefetch;
    private int queueBrowserPrefetch;
    private int topicPrefetch;
    private int durableTopicPrefetch;
    private int optimizeDurableTopicPrefetch;
    private int inputStreamPrefetch;
    private int maximumPendingMessageLimit;

    /**
     * Initialize default prefetch policies
     */
    public ActiveMQPrefetchPolicy() {
        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
        this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
        this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
    }

这里我只截了一部分代码,可以看到队列(queue)默认的queuePrefetch为1000,queuePrefetch的最大值不能超过MAX_PREFETCH_SIZE(32767)

当然我们也可以自己设置消费者预先拉取的消息数量,方法有两种

一:在创建connection之后修改connection中的queuePrefetch;代码如下:

ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(number);
connection.setPrefetchPolicy(prefetchPolicy);

二:在创建队列(queue)的时候传入参数,回到ActiveMQMessageConsumer的创建代码中:

 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
            String name, String selector, int prefetch,
            int maximumPendingMessageCount, boolean noLocal, boolean browser,
            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
        if (dest == null) {
            throw new InvalidDestinationException("Don‘t understand null destinations");
        } else if (dest.getPhysicalName() == null) {
            throw new InvalidDestinationException("The destination object was not given a physical name.");
        } else if (dest.isTemporary()) {
            String physicalName = dest.getPhysicalName();

            if (physicalName == null) {
                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
            }

            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();

            if (physicalName.indexOf(connectionID) < 0) {
                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
            }

            if (session.connection.isDeleted(dest)) {
                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
            }
            if (prefetch < 0) {
                throw new JMSException("Cannot have a prefetch size less than zero");
            }
        }
        if (session.connection.isMessagePrioritySupported()) {
            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
        }else {
            this.unconsumedMessages = new FifoMessageDispatchChannel();
        }

        this.session = session;
        this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
        setTransformer(session.getTransformer());

        this.info = new ConsumerInfo(consumerId);
        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
        this.info.setSubscriptionName(name);
        this.info.setPrefetchSize(prefetch);
        this.info.setCurrentPrefetchSize(prefetch);
        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
        this.info.setNoLocal(noLocal);
        this.info.setDispatchAsync(dispatchAsync);
        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
        this.info.setSelector(null);

        // Allows the options on the destination to configure the consumerInfo
        if (dest.getOptions() != null) {
            Map<String, Object> options = IntrospectionSupport.extractProperties(
                new HashMap<String, Object>(dest.getOptions()), "consumer.");
            IntrospectionSupport.setProperties(this.info, options);
            if (options.size() > 0) {
                String msg = "There are " + options.size()
                    + " consumer options that couldn‘t be set on the consumer."
                    + " Check the options are spelled correctly."
                    + " Unknown parameters=[" + options + "]."
                    + " This consumer cannot be started.";
                LOG.warn(msg);
                throw new ConfigurationException(msg);
            }
        }

        this.info.setDestination(dest);
        this.info.setBrowser(browser);
        if (selector != null && selector.trim().length() != 0) {
            // Validate the selector
            SelectorParser.parse(selector);
            this.info.setSelector(selector);
            this.selector = selector;
        } else if (info.getSelector() != null) {
            // Validate the selector
            SelectorParser.parse(this.info.getSelector());
            this.selector = this.info.getSelector();
        } else {
            this.selector = null;
        }

        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
                                   && !info.isBrowser();
        if (this.optimizeAcknowledge) {
            this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
            setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
        }

        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
        this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
        this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
        if (messageListener != null) {
            setMessageListener(messageListener);
        }
        try {
            this.session.addConsumer(this);
            this.session.syncSendPacket(info);
        } catch (JMSException e) {
            this.session.removeConsumer(this);
            throw e;
        }

        if (session.connection.isStarted()) {
            start();
        }
    }

this.info.setPrefetchSize(prefetch);

又上面代码可以看出,在创建ActiveMQMessageConsumer的过程中,程序会将connection中的queuePrefetch赋给ActiveMQMessageConsumer对象中的info对象(info为一个ConsumerInfo对象)

Map<String, Object> options = IntrospectionSupport.extractProperties(
                new HashMap<String, Object>(dest.getOptions()), "consumer.");
            IntrospectionSupport.setProperties(this.info, options);

在创建队列(queue)的过程中,我们可以传一些参数来配置消费者,这些参数的前缀必须为consumer. ,当我们传的参数与info对象中的属性匹配时,将覆盖info对象中的属性值,其传参形式如下:

queueName?param1=value1&param2=value2

所以我们如果想改变消费者预先拉取的消息数量,可以在创建对象的时候传入如下参数

queue = session.createQueue("test?consumer.prefetchSize=number");

 

ActiveMq接收消息--监听器

监听器代码如下:

package com.apt.study.util.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ReceiveListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
             TextMessage msg = (TextMessage) message;
             if(msg!=null) {
                 System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText());
             }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

消息接收类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";
    
    private static final String PASSWORD = "admin";
    
    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
    
    private AtomicInteger count = new AtomicInteger();
    
    private ConnectionFactory connectionFactory;
    
    private ActiveMQConnection connection;
    
    private Session session;
    
    private Queue queue;
    
    private MessageConsumer consumer;
    
    public void init() {
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //从工厂中创建一个链接
            connection = (ActiveMQConnection) connectionFactory.createConnection();
            
            //启动链接
            connection.start();
            //创建一个事物session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            queue = session.createQueue("test");
            
            consumer = session.createConsumer(queue);
            //设置消息监听器
            consumer.setMessageListener(new ReceiveListener());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
}
consumer.setMessageListener方法:
public void setMessageListener(MessageListener listener) throws JMSException {
        checkClosed();
        if (info.getPrefetchSize() == 0) {
            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
        }
        if (listener != null) {
            boolean wasRunning = session.isRunning();
            if (wasRunning) {
                session.stop();
            }

            this.messageListener.set(listener);
            session.redispatch(this, unconsumedMessages);

            if (wasRunning) {
                session.start();
            }
        } else {
            this.messageListener.set(null);
        }
    }

从代码中可以看出,当我们使用监听器时,消费者prefetchSize必须大于0

 

 

 
 

以上是关于ActiveMQ消息发送与接收的主要内容,如果未能解决你的问题,请参考以下文章

php ActiveMQ的发送消息,与处理消息

学习ActiveMQ:JMS消息的确认与重发机制

学习ActiveMQ:点对点(队列)模式消息演示

ActiveMQ消息队列的使用及应用

ActiveMQ消息队列的使用及应用

ActiveMQ消息队列的使用及应用