JMSorrelationID 过滤 JMS 消息接收方
Posted
技术标签:
【中文标题】JMSorrelationID 过滤 JMS 消息接收方【英文标题】:JMS message receiver filtering by JMSCorrelationID 【发布时间】:2010-09-14 00:34:57 【问题描述】:如何在 java (JRE /JDK / J2EE 1.4) 中实例化只接收与给定 JMSCorrelationID 匹配的消息的 JMS 队列侦听器?我要获取的消息已发布到队列而不是主题,尽管如果需要可以更改。
这是我目前用来将消息放入队列的代码:/**
* publishResponseToQueue publishes Requests to the Queue.
*
* @param jmsQueueFactory -Name of the queue-connection-factory
* @param jmsQueue -The queue name for the request
* @param response -A response object that needs to be published
*
* @throws ServiceLocatorException -An exception if a request message
* could not be published to the Topic
*/
private void publishResponseToQueue( String jmsQueueFactory,
String jmsQueue,
Response response )
throws ServiceLocatorException
if ( logger.isInfoEnabled() )
logger.info( "Begin publishRequestToQueue: " +
jmsQueueFactory + "," + jmsQueue + "," + response );
logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
"jmsQueue cannot be null" );
logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
"jmsQueueFactory cannot be null" );
logger.assertLog( response != null, "Request cannot be null" );
try
Queue queue = (Queue)_context.lookup( jmsQueue );
QueueConnectionFactory factory = (QueueConnectionFactory)
_context.lookup( jmsQueueFactory );
QueueConnection connection = factory.createQueueConnection();
connection.start();
QueueSession session = connection.createQueueSession( false,
QueueSession.AUTO_ACKNOWLEDGE );
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setJMSCorrelationID(response.getID());
objectMessage.setObject( response );
session.createSender( queue ).send( objectMessage );
session.close();
connection.close();
catch ( Exception e )
//XC3.2 Added/Modified BEGIN
logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
"Response to the Queue - " + e.getMessage() );
throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
"- Could not publish the " +
"Response to the Queue - " + e.getMessage() );
//XC3.2 Added/Modified END
if ( logger.isInfoEnabled() )
logger.info( "End publishResponseToQueue: " +
jmsQueueFactory + "," + jmsQueue + response );
// end of publishResponseToQueue method
【问题讨论】:
【参考方案1】:希望这会有所帮助。我使用了 Open MQ。
package com.MQueues;
import java.util.UUID;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.BasicQueue;
import com.sun.messaging.QueueConnectionFactory;
public class HelloProducerConsumer
public static String queueName = "queue0";
public static String correlationId;
public static String getCorrelationId()
return correlationId;
public static void setCorrelationId(String correlationId)
HelloProducerConsumer.correlationId = correlationId;
public static String getQueueName()
return queueName;
public static void sendMessage(String threadName)
correlationId = UUID.randomUUID().toString();
try
// Start connection
QueueConnectionFactory cf = new QueueConnectionFactory();
QueueConnection connection = cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BasicQueue destination = (BasicQueue) session.createQueue(threadName);
MessageProducer producer = session.createProducer(destination);
connection.start();
// create message to send
TextMessage message = session.createTextMessage();
message.setJMSCorrelationID(correlationId);
message.setText(threadName + "(" + System.currentTimeMillis()
+ ") " + correlationId +" from Producer");
System.out.println(correlationId +" Send from Producer");
producer.send(message);
// close everything
producer.close();
session.close();
connection.close();
catch (JMSException ex)
System.out.println("Error = " + ex.getMessage());
public static void receivemessage(final String correlationId)
try
QueueConnectionFactory cf = new QueueConnectionFactory();
QueueConnection connection = cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());
connection.start();
System.out.println("\n");
System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
long now = System.currentTimeMillis();
// receive our message
String filter = "JMSCorrelationID = '" + correlationId + "'";
QueueReceiver receiver = session.createReceiver(destination, filter);
TextMessage m = (TextMessage) receiver.receive();
System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());
System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
session.close();
connection.close();
catch (JMSException ex)
System.out.println("Error = " + ex.getMessage());
public static void main(String args[])
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId1 = getCorrelationId();
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId2 = getCorrelationId();
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId3 = getCorrelationId();
HelloProducerConsumer.receivemessage(correlationId2);
HelloProducerConsumer.receivemessage(correlationId1);
HelloProducerConsumer.receivemessage(correlationId3);
【讨论】:
【参考方案2】:String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
QueueReceiver receiver = session.createReceiver(queue, filter);
这里接收者将收到JMSCorrelationID
等于MessageID
的消息。这在请求/响应范例中非常有帮助。
或者您可以直接将其设置为任何值:
QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";);
你可以做到receiver.receive(2000);
或receiver.setMessageListener(this);
【讨论】:
【参考方案3】:顺便说一句,虽然这不是您提出的实际问题 - 如果您尝试通过 JMS 实现请求响应,我建议您使用 reading this article,因为 JMS API 比您想象的要复杂得多,并且有效地执行此操作要多得多比看起来更难。
尤其是to use JMS efficiently,您应该尽量避免为单个消息等创建消费者。
另外,由于 JMS API 非常复杂,无法正确有效地使用 - 特别是在池、事务和并发处理方面 - 我推荐人们 hide the middleware from their application code,例如通过使用 Apache Camel's Spring Remoting implementation for JMS
【讨论】:
如果我几年前知道 Camel,我会为自己节省很多车轮改造。【参考方案4】:队列连接设置是相同的,但是一旦有了 QueueSession,就可以在创建接收器时设置选择器。
QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");
然后
receiver.receive()
或
receiver.setListener(myListener);
【讨论】:
我最近一直在阅读同一个主题,并且有一个问题如下:接收者是否仍会收到那些不包含所需关联 ID 的消息并在不处理的情况下静默丢弃它们,或者JMS 提供者本身是否不会将此类消息传递给接收者,以便它们仍然保留在队列中?我觉得后者是正确的做法,但想验证一下。谢谢。 @Robin ... 在这种情况下,过滤条件是 "JMSCorrelationID='theid'" 。我需要再添加一个条件,例如“JMSCorrelationID='theid'”和“Location=ASIA”。这个多重过滤条件的语法是什么以上是关于JMSorrelationID 过滤 JMS 消息接收方的主要内容,如果未能解决你的问题,请参考以下文章