如何使 JMS onMessage 方法异步
Posted
技术标签:
【中文标题】如何使 JMS onMessage 方法异步【英文标题】:How to make JMS onMessage method asynchronous 【发布时间】:2021-01-18 05:11:02 【问题描述】:我正在开发一个 JMS POC,它监听一个 IBM MQ。 MQConnectionFactory
已用于与 MQ 连接,并使用 onMessage()
方法监听输入消息。收到消息后,它正在使用MarshallingMessageConverter
转换为所需的对象类型,并在执行更多操作后将其推送到另一个队列(响应)。
到目前为止,它运行良好。但是,似乎队列中的所有消息都是同步消耗的,例如只有在第一条消息处理完成后,第二条消息才进入onMessage()
方法。
第一季度。我怎样才能使其异步以提高性能?
第二季度。是否建议使其异步?
下面是我的代码的 sn-p:
config.xml:
<bean id="oxmMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<property name="marshaller" ref="jaxbMapper" />
<property name="unmarshaller" ref="jaxbMapper" />
<property name="targetType">
<util:constant
static-field="org.springframework.jms.support.converter.MessageType.TEXT" />
</property>
</bean>
<bean id="jmsResponseSenderA" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsMQConnectionFactoryA" />
<property name="defaultDestination" ref="jmsResponseQueue" />
<property name="messageConverter" ref="oxmMessageConverter" />
</bean>
<bean id="jmsMQConnectionFactoryA" class="com.ibm.mq.jms.MQConnectionFactory">
<property name="transportType">
<util:constant
static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CM_CLIENT" />
</property>
<property name="clientReconnectOptions">
<util:constant
static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT_Q_MGR" />
</property>
<property name="queueManager" value="$queue.manager.A" />
<property name="CCDTURL" value="$ccdt.url"></property>
</bean>
监听类:
public class ListenerServiceImpl implements MessageListener
public void onMessage(Message message)
// action1
// action2
// action3
jmsResponseSender.convertAndSend(response);
【问题讨论】:
onMessage 每条消息调用一次。您可以考虑部署多个消费者,以便可以使用消息以提高性能。 有什么办法让它异步吗?另外,当您说多个消费者时,您的意思是从多个队列中读取消息? 当Shashi说“多个消费者”时,他的意思是来自同一个队列,即让他们同时处理队列。您要求将其设为“异步”,但在 Message 上已经是异步的。您的问题似乎要求的是并发性。如果这不是您的意思,也许您可以详细说明在这种情况下“异步”对您意味着什么? 你的监听器配置在哪里? 【参考方案1】:JMS MessageListener
实现的onMessage()
方法已经是异步的。您的问题是您只有 1 个MessageConsumer
。您需要配置多个消费者,以便他们可以使用自己的MessageListener
同时消费消息。
【讨论】:
【参考方案2】:看起来您正在使用spring-jms
。如果您还使用mq-jms-spring-boot-starter
,那么您可以配置连接池。可以设置的属性在https://github.com/ibm-messaging/mq-jms-spring
对于您的情况,假设您想要一个包含 5 个连接的池:
ibm.mq.pool.enabled=true
ibm.mq.pool.maxConnections=5
当你有池时,你需要确保你的onMessage
方法是可重入兼容的。在我的测试中,我有一个 @JmsListener
并设置了并发,并且处于休眠状态,因此我可以看到并行处理。
@JmsListener(destination = "$queue.name", concurrency = "3-5")
public void receiveString(String comment)
String fid = Integer.toString(rand.nextInt(99));
logger.info(fid);
logger.info(fid + " ========================================");
logger.info(fid + " Received string message is: " + comment);
logger.info(fid + " Sleeping");
try
Thread.sleep(10000);
catch (InterruptedException e)
logger.info(fid + " Waking" );
logger.info(fid + " ========================================");
【讨论】:
以上是关于如何使 JMS onMessage 方法异步的主要内容,如果未能解决你的问题,请参考以下文章