如何使 JMS onMessage 方法异步

Posted

技术标签:

【中文标题】如何使 JMS onMessage 方法异步【英文标题】:How to make JMS onMessage method asynchronous 【发布时间】:2021-01-18 05:11:02 【问题描述】:

我正在开发一个 JMS POC,它监听一个 IBM MQ。 MQConnectionFactory 已用于与 MQ 连接,并使用 onMessage() 方法监听输入消息。收到消息后,它正在使用MarshallingMessageConverter 转换为所需的对象类型,并在执行更多操作后将其推送到另一个队列(响应)。

到目前为止,它运行良好。但是,似乎队列中的所有消息都是同步消耗的,例如只有在第一条消息处理完成后,第二条消息才进入onMessage()方法。

第一季度。我怎样才能使其异步以提高性能?

第二季度。是否建议使其异步?

下面是我的代码的 sn-p:

config.xml:

<bean id="oxmMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <property name="marshaller" ref="jaxbMapper" />
    <property name="unmarshaller" ref="jaxbMapper" />
    <property name="targetType">
        <util:constant
            static-field="org.springframework.jms.support.converter.MessageType.TEXT" />
    </property>
</bean>

<bean id="jmsResponseSenderA" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsMQConnectionFactoryA" />
    <property name="defaultDestination" ref="jmsResponseQueue" />
    <property name="messageConverter" ref="oxmMessageConverter" />
</bean>

<bean id="jmsMQConnectionFactoryA" class="com.ibm.mq.jms.MQConnectionFactory">
    <property name="transportType">
        <util:constant
            static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CM_CLIENT" />
    </property>
    <property name="clientReconnectOptions">
        <util:constant
            static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT_Q_MGR" />
    </property>
    <property name="queueManager" value="$queue.manager.A" />
    <property name="CCDTURL" value="$ccdt.url"></property>
            
</bean>

监听类:

public class ListenerServiceImpl implements MessageListener 
  public void onMessage(Message message) 
     // action1
     // action2
     // action3
    jmsResponseSender.convertAndSend(response);
  

【问题讨论】:

onMessage 每条消息调用一次。您可以考虑部署多个消费者,以便可以使用消息以提高性能。 有什么办法让它异步吗?另外,当您说多个消费者时,您的意思是从多个队列中读取消息? 当Shashi说“多个消费者”时,他的意思是来自同一个队列,即让他们同时处理队列。您要求将其设为“异步”,但在 Message 上已经是异步的。您的问题似乎要求的是并发性。如果这不是您的意思,也许您可​​以详细说明在这种情况下“异步”对您意味着什么? 你的监听器配置在哪里? 【参考方案1】:

JMS MessageListener 实现的onMessage() 方法已经是异步的。您的问题是您只有 1 个MessageConsumer。您需要配置多个消费者,以便他们可以使用自己的MessageListener 同时消费消息。

【讨论】:

【参考方案2】:

看起来您正在使用spring-jms。如果您还使用mq-jms-spring-boot-starter,那么您可以配置连接池。可以设置的属性在https://github.com/ibm-messaging/mq-jms-spring

对于您的情况,假设您想要一个包含 5 个连接的池:

ibm.mq.pool.enabled=true
ibm.mq.pool.maxConnections=5

当你有池时,你需要确保你的onMessage 方法是可重入兼容的。在我的测试中,我有一个 @JmsListener 并设置了并发,并且处于休眠状态,因此我可以看到并行处理。

    @JmsListener(destination = "$queue.name", concurrency = "3-5") 
    public void receiveString(String comment) 
        String fid = Integer.toString(rand.nextInt(99));

        logger.info(fid);
        logger.info(fid + " ========================================");
        logger.info(fid + " Received string message is: " + comment);

        logger.info(fid + " Sleeping");

        try 
            Thread.sleep(10000);
         catch (InterruptedException e)  
        logger.info(fid + " Waking" );

        logger.info(fid + " ========================================");
    

【讨论】:

以上是关于如何使 JMS onMessage 方法异步的主要内容,如果未能解决你的问题,请参考以下文章

如何异步消费ServiceStack的ServerEventsClient OnMessage

从 JMS 队列中批量获取

获取JMS消息接收时间

onMessage 在 iOS 上运行多个异步任务

如何尝试在并发环境中重新启动() JMS 连接?

Apache CXF JMS - SOAP