如何将 MQ 服务器回复消息与正确的请求相匹配
Posted
技术标签:
【中文标题】如何将 MQ 服务器回复消息与正确的请求相匹配【英文标题】:How to match MQ Server reply messages to the correct request 【发布时间】:2011-06-28 14:55:51 【问题描述】:我正在连接到 IBM Websphere MQ。我希望能够将回复消息与正确的请求消息相匹配。我翻了数百页才得到这个,但没有运气。
我有一个类 - MQHandler - 它向一个定义的队列发送消息,并从另一个队列读取请求。这很好用,但是,如果多个用户同时使用该应用程序,消息就会混淆。
我似乎无法在接收器上找到一种方法来指示要匹配的 CorrelationID。 比如……
consumer.receive(选择器);
您可以检查以下方法以确保我正确执行此操作吗?
/**
* When the class is called, this initialisation is done first.
*
* @throws JMSException
*/
public void init() throws JMSException
// Create a connection factory
JmsFactoryFactory ff;
try
ff = JmsFactoryFactory.getInstance( WMQConstants.WMQ_PROVIDER );
cf = ff.createConnectionFactory();
// Set the properties
cf.setStringProperty( WMQConstants.WMQ_HOST_NAME, hostServer );
cf.setIntProperty( WMQConstants.WMQ_PORT, 1414 );
cf.setStringProperty( WMQConstants.WMQ_CHANNEL, channel );
cf.setIntProperty( WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT );
cf.setStringProperty( WMQConstants.WMQ_QUEUE_MANAGER, qManager );
connection = cf.createConnection();
session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );
catch( JMSException e )
throw e;
// end of init
/**
* @param request
* @return
* @throws JMSException
*/
private String sendRequest( String request ) throws JMSException
// Create JMS objects
Destination destination = session.createQueue( "queue:///" + writeQueueName );
// Enable write of MQMD fields. See documentation for further
// details.
((JmsDestination) destination).setBooleanProperty( WMQConstants.WMQ_MQMD_WRITE_ENABLED, true );
// Set message context, if needed. See comment at the top.
// Create a producer
MessageProducer producer = session.createProducer( destination );
// Create a message
TextMessage message = session.createTextMessage( request );
// Generate a custom message id
message.setJMSCorrelationID( generateRandomID() );
// Start the connection
connection.start();
// And, send the message
producer.send( message );
System.out.println(message);
return message.getJMSCorrelationID();
/**
* @param customMessageId
* @return
* @throws JMSException
*/
private String recvResponse( String customMessageId ) throws JMSException
Destination destination = session.createQueue( "queue:///" + readQueueName );
// Enable read of MQMD fields.
((JmsDestination) destination).setBooleanProperty( WMQConstants.WMQ_MQMD_READ_ENABLED, true );
((JmsDestination) destination).setObjectProperty( WMQConstants.JMS_IBM_MQMD_CORRELID, customMessageId );
// Create a consumer
MessageConsumer consumer = session.createConsumer( destination );
// Start the connection
connection.start();
// And, receive a message from the queue
TextMessage receivedMessage = (TextMessage)consumer.receive( 15000 );
connection.close();
session.close();
return receivedMessage.getText();
这里是main方法的sn-p...
try
String customMessageId;
init();
customMessageId = sendRequest( request );
return recvResponse( customMessageId );
catch( Exception ex )
System.out.println( "Error on MQ." );
throw new Exception( "\n\n*** An error occurred ***\n\n" + ex.getLocalizedMessage()
+ "\n\n**********************************" );
【问题讨论】:
【参考方案1】:QueueReceiver queueReceiver =
session.createReceiver(destination, "JMSCorrelationID='customMessageId'");
TextMessage receivedMessage = (TextMessage)queueReceiver.receive( 15000 );
在我的示例中,customMessageId 应该包含您之前设置的实际值。
另外,我见过很多情况,人们生成一个correlationID 并将其设置在出站消息中,期望能够根据该值选择响应。执行此操作的教科书方法是服务提供商应用程序在响应时将消息 ID 复制到相关 ID。请求者仍将 JMSCorrelationID 指定为选择器,但将使用原始 JMSMessageID 作为值。由于 JMSMessageID 即使在 QMgrs 中也保证是唯一的,因此您在此值上发生冲突的可能性要小得多。您需要确保您的客户与服务提供商的行为相匹配,即哪些值被复制到相关 ID 中。
【讨论】:
【参考方案2】:这可能是一个临时队列的用例,它只与创建它的连接相关联。
onJava上有一篇详细的文章,Designing Messaging Applications with Temporary Queues
“临时目的地(temporary queues 或临时主题)是 建议作为轻量级替代品 在可扩展的系统架构中, 可以用作独特的目的地 回复。这样的目的地有 范围仅限于连接 创建它,并在 服务器端只要连接 关门了。”
Java 文档说明:
您可以使用临时目的地 实现一个简单的请求/回复 机制。如果你创建一个临时 目的地并将其指定为 JMSReplyTo 消息头的值 发送消息时的字段, 消息的消费者可以使用 JMSReplyTo 字段的值作为 它发送回复的目的地 也可以参考原文 通过设置请求 JMSCorrelationID 头域 回复消息的值 JMSMessageID 头域 请求。
【讨论】:
以上是关于如何将 MQ 服务器回复消息与正确的请求相匹配的主要内容,如果未能解决你的问题,请参考以下文章
Rabbit MQ 阻止调用以发送消息并确保它为超出消息限制或超出消息大小限制提供正确的回复代码