JMSorrelationID 过滤 JMS 消息接收方

Posted

技术标签:

【中文标题】JMSorrelationID 过滤 JMS 消息接收方【英文标题】:JMS message receiver filtering by JMSCorrelationID 【发布时间】:2010-09-14 00:34:57 【问题描述】:

如何在 java (JRE /JDK / J2EE 1.4) 中实例化只接收与给定 JMSCorrelationID 匹配的消息的 JMS 队列侦听器?我要获取的消息已发布到队列而不是主题,尽管如果需要可以更改。

这是我目前用来将消息放入队列的代码:
/**
 * publishResponseToQueue publishes Requests to the Queue.
 *
 * @param   jmsQueueFactory             -Name of the queue-connection-factory
 * @param   jmsQueue                    -The queue name for the request
 * @param   response                     -A response object that needs to be published
 * 
 * @throws  ServiceLocatorException     -An exception if a request message
 *                                      could not be published to the Topic
 */
private void publishResponseToQueue( String jmsQueueFactory,
                                    String jmsQueue,
                                    Response response )
        throws ServiceLocatorException 

    if ( logger.isInfoEnabled() ) 
        logger.info( "Begin publishRequestToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + "," + response );
    
    logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
                      "jmsQueue cannot be null" );
    logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
                      "jmsQueueFactory cannot be null" );
    logger.assertLog( response != null, "Request cannot be null" );

    try 

        Queue queue = (Queue)_context.lookup( jmsQueue );

        QueueConnectionFactory factory = (QueueConnectionFactory)
            _context.lookup( jmsQueueFactory );

        QueueConnection connection = factory.createQueueConnection();
        connection.start();
        QueueSession session = connection.createQueueSession( false,
                                    QueueSession.AUTO_ACKNOWLEDGE );

        ObjectMessage objectMessage = session.createObjectMessage();

        objectMessage.setJMSCorrelationID(response.getID());

        objectMessage.setObject( response );

        session.createSender( queue ).send( objectMessage );

        session.close();
        connection.close();

     catch ( Exception e ) 
        //XC3.2  Added/Modified BEGIN
        logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
                                           "- Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        //XC3.2  Added/Modified END
    

    if ( logger.isInfoEnabled() ) 
        logger.info( "End publishResponseToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + response );
    

  // end of publishResponseToQueue method 

【问题讨论】:

【参考方案1】:

希望这会有所帮助。我使用了 Open MQ。

package com.MQueues;

import java.util.UUID;

import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.sun.messaging.BasicQueue;
import com.sun.messaging.QueueConnectionFactory;

public class HelloProducerConsumer 

public static String queueName = "queue0";
public static String correlationId;

public static String getCorrelationId() 
    return correlationId;


public static void setCorrelationId(String correlationId) 
    HelloProducerConsumer.correlationId = correlationId;


public static String getQueueName() 
    return queueName;


public static void sendMessage(String threadName) 
    correlationId = UUID.randomUUID().toString();
    try 

        // Start connection
        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        BasicQueue destination = (BasicQueue) session.createQueue(threadName);
        MessageProducer producer = session.createProducer(destination);
        connection.start();

        // create message to send
        TextMessage message = session.createTextMessage();
        message.setJMSCorrelationID(correlationId);
        message.setText(threadName + "(" + System.currentTimeMillis() 
                + ") " + correlationId +" from Producer");

        System.out.println(correlationId +" Send from Producer");
        producer.send(message);

        // close everything
        producer.close();
        session.close();
        connection.close();

     catch (JMSException ex) 
        System.out.println("Error = " + ex.getMessage());
    


public static void receivemessage(final String correlationId) 
    try 

        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());

        connection.start();

        System.out.println("\n");
        System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
        long now = System.currentTimeMillis();

        // receive our message
        String filter = "JMSCorrelationID = '" + correlationId  + "'";
        QueueReceiver receiver = session.createReceiver(destination, filter);
        TextMessage m = (TextMessage) receiver.receive();
        System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());

        System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");

        session.close();
        connection.close();

     catch (JMSException ex) 
        System.out.println("Error = " + ex.getMessage());
    


public static void main(String args[]) 
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId1 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId2 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId3 = getCorrelationId();


    HelloProducerConsumer.receivemessage(correlationId2);

    HelloProducerConsumer.receivemessage(correlationId1);

    HelloProducerConsumer.receivemessage(correlationId3);


【讨论】:

【参考方案2】:
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
QueueReceiver receiver = session.createReceiver(queue, filter);

这里接收者将收到JMSCorrelationID 等于MessageID 的消息。这在请求/响应范例中非常有帮助。

或者您可以直接将其设置为任何值:

QueueReceiver receiver = session.createReceiver(queue,  "JMSCorrelationID ='"+id+"'";);

你可以做到receiver.receive(2000);receiver.setMessageListener(this);

【讨论】:

【参考方案3】:

顺便说一句,虽然这不是您提出的实际问题 - 如果您尝试通过 JMS 实现请求响应,我建议您使用 reading this article,因为 JMS API 比您想象的要复杂得多,并且有效地执行此操作要多得多比看起来更难。

尤其是to use JMS efficiently,您应该尽量避免为单个消息等创建消费者。

另外,由于 JMS API 非常复杂,无法正确有效地使用 - 特别是在池、事务和并发处理方面 - 我推荐人们 hide the middleware from their application code,例如通过使用 Apache Camel's Spring Remoting implementation for JMS

【讨论】:

如果我几年前知道 Camel,我会为自己节省很多车轮改造。【参考方案4】:

队列连接设置是相同的,但是一旦有了 QueueSession,就可以在创建接收器时设置选择器。

    QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");

然后

receiver.receive()

receiver.setListener(myListener);

【讨论】:

我最近一直在阅读同一个主题,并且有一个问题如下:接收者是否仍会收到那些不包含所需关联 ID 的消息并在不处理的情况下静默丢弃它们,或者JMS 提供者本身是否不会将此类消息传递给接收者,以便它们仍然保留在队列中?我觉得后者是正确的做法,但想验证一下。谢谢。 @Robin ... 在这种情况下,过滤条件是 "JMSCorrelationID='theid'" 。我需要再添加一个条件,例如“JMSCorrelationID='theid'”和“Location=ASIA”。这个多重过滤条件的语法是什么

以上是关于JMSorrelationID 过滤 JMS 消息接收方的主要内容,如果未能解决你的问题,请参考以下文章

JMS中间件ActiveMQ详解

JMS 主题与选择器

在运行时创建 JMS 队列 [关闭]

JMS 生产者最佳实践 [关闭]

JMS基本概念

ActiveMQ——理解和掌握JMS