ActiveMQ消息发送与接收
Posted 四季常青
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ消息发送与接收相关的知识,希望对你有一定的参考价值。
ActiveMQ发送消息
1:创建链接工厂ConnectionFactory
2:创建链接Connection
3:启动session
4:创建消息发送目的地
5:创建生产者
6:发送消息
消息发送类:
package com.apt.study.util.activemq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKEN_URL = "tcp://127.0.0.1:61616"; private AtomicInteger count = new AtomicInteger(); private ConnectionFactory connectionFactory; private Connection connection; private Session session; private Queue queue; private MessageProducer producer; public void init() { try { //创建一个链接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //从工厂中创建一个链接 connection = connectionFactory.createConnection(); //启动链接,不启动不影响消息的发送,但影响消息的接收 connection.start(); //创建一个事物session session = connection.createSession(true, Session.SESSION_TRANSACTED); //获取消息发送的目的地,指消息发往那个地方 queue = session.createQueue("test"); //获取消息发送的生产者 producer = session.createProducer(queue); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void sendMsg(String queueName) { try { int num = count.getAndIncrement(); TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:生产者发送消息!,count:"+num); producer.send(msg); session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
connection.createSession方法
/** * Creates a <CODE>Session</CODE> object. * * @param transacted indicates whether the session is transacted * @param acknowledgeMode indicates whether the consumer or the client will * acknowledge any messages it receives; ignored if the * session is transacted. Legal values are * <code>Session.AUTO_ACKNOWLEDGE</code>, * <code>Session.CLIENT_ACKNOWLEDGE</code>, and * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. * @return a newly created session * @throws JMSException if the <CODE>Connection</CODE> object fails to * create a session due to some internal error or lack of * support for the specific transaction and acknowledgement * mode. * @see Session#AUTO_ACKNOWLEDGE * @see Session#CLIENT_ACKNOWLEDGE * @see Session#DUPS_OK_ACKNOWLEDGE * @since 1.1 */ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); if(!transacted) { if (acknowledgeMode==Session.SESSION_TRANSACTED) { throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); } } return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); }
createSession方法里有两个参数,第一个参数表示是否使用事务,第二个参数表示消息的确认模式。消息的确认模式共有4种:
1:AUTO_ACKNOWLEDGE 自动确认
2:CLIENT_ACKNOWLEDGE 客户端手动确认
3:DUPS_OK_ACKNOWLEDGE 自动批量确认
0:SESSION_TRANSACTED 事务提交并确认
4:INDIVIDUAL_ACKNOWLEDGE 单条消息确认 为AcitveMQ自定义的ACK_MODE
各种确认模式详细说明可以看文章:ActiveMQ讯息传送机制以及ACK机制
从createSession方法中可以看出如果如果session不使用事务但是却使用了消息提交(SESSION_TRANSACTED)确认模式,或使用的消息确认模式不存在,将抛出异常。
ActiveMQ接收消息
1:创建链接工厂ConnectionFactory
2:创建链接Connection
3:启动session
4:创建消息发送目的地
5:创建生产者
6:接收消息或设置消息监听器
消息接收类:
package com.apt.study.util.activemq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; public class Receiver { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKEN_URL = "tcp://127.0.0.1:61616"; private AtomicInteger count = new AtomicInteger(); private ConnectionFactory connectionFactory; private ActiveMQConnection connection; private Session session; private Queue queue; private MessageConsumer consumer; public void init() { try { //创建一个链接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //从工厂中创建一个链接 connection = (ActiveMQConnection) connectionFactory.createConnection(); //启动链接 connection.start(); //创建一个事物session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //获取消息接收的目的地,指从哪里接收消息 queue = session.createQueue("test"); //获取消息接收的消费者 consumer = session.createConsumer(queue); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void receiver(String queueName) { try { TextMessage msg = (TextMessage) consumer.receive(); if(msg!=null) { System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement()); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
consumer.receive方法
/** * Receives the next message produced for this message consumer. * <P> * This call blocks indefinitely until a message is produced or until this * message consumer is closed. * <P> * If this <CODE>receive</CODE> is done within a transaction, the consumer * retains the message until the transaction commits. * * @return the next message produced for this message consumer, or null if * this message consumer is concurrently closed */ public Message receive() throws JMSException { checkClosed(); //检查unconsumedMessages是否关闭 ,消费者从unconsumedMessages对象中获取消息 checkMessageListener(); //检查是否有其他消费者使用了监听器,同一消息消息队列中不能采用reveice和messageListener并存消费消息 sendPullCommand(0); //如果prefetchSize为空且unconsumedMessages为空 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备 MessageDispatch md = dequeue(-1); //从unconsumedMessages取出一个消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); }
prefetchSize属性如果大于0,消费者每次拉去消息时都会预先拉取一定量的消息,拉取的消息数量<=prefetchSize,prefetchSize默认指为1000,这个默认值是从connection中传过来的
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { checkClosed(); if (destination instanceof CustomDestination) { CustomDestination customDestination = (CustomDestination)destination; return customDestination.createConsumer(this, messageSelector, noLocal); } ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); int prefetch = 0; if (destination instanceof Topic) { prefetch = prefetchPolicy.getTopicPrefetch(); } else { prefetch = prefetchPolicy.getQueuePrefetch(); } ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); }
消息接收类代码中调用session.createConsumer其实调用的就是上面的createConsumer方法,从上面代码中可以看出connection会将自己的prefetch传递给消费者,connection中的ActiveMQPrefetchPolicy
对象属性如下:
public class ActiveMQPrefetchPolicy extends Object implements Serializable { public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE; public static final int DEFAULT_QUEUE_PREFETCH = 1000; public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500; public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100; public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000; public static final int DEFAULT_INPUT_STREAM_PREFETCH=100; public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE; private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class); private int queuePrefetch; private int queueBrowserPrefetch; private int topicPrefetch; private int durableTopicPrefetch; private int optimizeDurableTopicPrefetch; private int inputStreamPrefetch; private int maximumPendingMessageLimit; /** * Initialize default prefetch policies */ public ActiveMQPrefetchPolicy() { this.queuePrefetch = DEFAULT_QUEUE_PREFETCH; this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH; this.topicPrefetch = DEFAULT_TOPIC_PREFETCH; this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH; this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH; this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH; }
这里我只截了一部分代码,可以看到队列(queue)默认的queuePrefetch为1000,queuePrefetch的最大值不能超过MAX_PREFETCH_SIZE(32767)
当然我们也可以自己设置消费者预先拉取的消息数量,方法有两种
一:在创建connection之后修改connection中的queuePrefetch;代码如下:
ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(number);
connection.setPrefetchPolicy(prefetchPolicy);
二:在创建队列(queue)的时候传入参数,回到ActiveMQMessageConsumer的创建代码中:
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener) throws JMSException { if (dest == null) { throw new InvalidDestinationException("Don‘t understand null destinations"); } else if (dest.getPhysicalName() == null) { throw new InvalidDestinationException("The destination object was not given a physical name."); } else if (dest.isTemporary()) { String physicalName = dest.getPhysicalName(); if (physicalName == null) { throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); } String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); if (physicalName.indexOf(connectionID) < 0) { throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); } if (session.connection.isDeleted(dest)) { throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); } if (prefetch < 0) { throw new JMSException("Cannot have a prefetch size less than zero"); } } if (session.connection.isMessagePrioritySupported()) { this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); }else { this.unconsumedMessages = new FifoMessageDispatchChannel(); } this.session = session; this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest); setTransformer(session.getTransformer()); this.info = new ConsumerInfo(consumerId); this.info.setExclusive(this.session.connection.isExclusiveConsumer()); this.info.setSubscriptionName(name); this.info.setPrefetchSize(prefetch); this.info.setCurrentPrefetchSize(prefetch); this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); this.info.setNoLocal(noLocal); this.info.setDispatchAsync(dispatchAsync); this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); this.info.setSelector(null); // Allows the options on the destination to configure the consumerInfo if (dest.getOptions() != null) { Map<String, Object> options = IntrospectionSupport.extractProperties( new HashMap<String, Object>(dest.getOptions()), "consumer."); IntrospectionSupport.setProperties(this.info, options); if (options.size() > 0) { String msg = "There are " + options.size() + " consumer options that couldn‘t be set on the consumer." + " Check the options are spelled correctly." + " Unknown parameters=[" + options + "]." + " This consumer cannot be started."; LOG.warn(msg); throw new ConfigurationException(msg); } } this.info.setDestination(dest); this.info.setBrowser(browser); if (selector != null && selector.trim().length() != 0) { // Validate the selector SelectorParser.parse(selector); this.info.setSelector(selector); this.selector = selector; } else if (info.getSelector() != null) { // Validate the selector SelectorParser.parse(this.info.getSelector()); this.selector = this.info.getSelector(); } else { this.selector = null; } this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !info.isBrowser(); if (this.optimizeAcknowledge) { this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut(); setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval()); } this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery; if (messageListener != null) { setMessageListener(messageListener); } try { this.session.addConsumer(this); this.session.syncSendPacket(info); } catch (JMSException e) { this.session.removeConsumer(this); throw e; } if (session.connection.isStarted()) { start(); } }
this.info.setPrefetchSize(prefetch);
又上面代码可以看出,在创建ActiveMQMessageConsumer的过程中,程序会将connection中的queuePrefetch赋给ActiveMQMessageConsumer对象中的info对象(info为一个ConsumerInfo对象)
Map<String, Object> options = IntrospectionSupport.extractProperties( new HashMap<String, Object>(dest.getOptions()), "consumer."); IntrospectionSupport.setProperties(this.info, options);
在创建队列(queue)的过程中,我们可以传一些参数来配置消费者,这些参数的前缀必须为consumer. ,当我们传的参数与info对象中的属性匹配时,将覆盖info对象中的属性值,其传参形式如下:
queueName?param1=value1¶m2=value2
所以我们如果想改变消费者预先拉取的消息数量,可以在创建对象的时候传入如下参数
queue = session.createQueue("test?consumer.prefetchSize=number");
ActiveMq接收消息--监听器
监听器代码如下:
package com.apt.study.util.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ReceiveListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage msg = (TextMessage) message; if(msg!=null) { System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消息接收类:
package com.apt.study.util.activemq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; public class Receiver { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKEN_URL = "tcp://127.0.0.1:61616"; private AtomicInteger count = new AtomicInteger(); private ConnectionFactory connectionFactory; private ActiveMQConnection connection; private Session session; private Queue queue; private MessageConsumer consumer; public void init() { try { //创建一个链接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //从工厂中创建一个链接 connection = (ActiveMQConnection) connectionFactory.createConnection(); //启动链接 connection.start(); //创建一个事物session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue("test"); consumer = session.createConsumer(queue); //设置消息监听器 consumer.setMessageListener(new ReceiveListener()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
consumer.setMessageListener方法:
public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); if (wasRunning) { session.stop(); } this.messageListener.set(listener); session.redispatch(this, unconsumedMessages); if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
从代码中可以看出,当我们使用监听器时,消费者prefetchSize必须大于0
以上是关于ActiveMQ消息发送与接收的主要内容,如果未能解决你的问题,请参考以下文章