将会话/事务的超时设置为 DefaultMessageListenerContainer

Posted

技术标签:

【中文标题】将会话/事务的超时设置为 DefaultMessageListenerContainer【英文标题】:Set timeOut for session/transaction to DefaultMessageListenerContainer 【发布时间】:2019-09-09 01:14:58 【问题描述】:

我想使用 link 中提到的 Spring 的 DefaultMessageListenerConatiner(通过覆盖 doReceiveAndExecute)在某个 timeLimit(例如:收到第一条消息后 300 毫秒)内从队列接收大量消息。

当队列中有太多消息时,我可以将批量大小的消息分组,即 20 条,而当队列中的消息非常少时,我可以接收不到 20 条消息。

问题:

我发现即使队列已满,将消息发送到侦听器也需要太多时间(有时 1 秒,有时 2 秒甚至更多)。

当我尝试使用 DefaultMessageListenerConatiner 同时接收单个消息时,我看到消息的接收延迟为几毫秒(如 1 毫秒或最多 30 到 60 毫秒)

我没有指定 transactionTimeout 或 receiveTimeout,也没有链接任何 transactionManager。

Springers 能否帮我找到可以指定 timeOut 的位置或如何减少时间延迟?

BatchMessageListenerContainer:

package com.mypackage;

import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;

import java.util.ArrayList;
import java.util.List;

import org.springframework.jms.listener.DefaultMessageListenerContainer;

import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.support.JmsUtils;
import org.springframework.transaction.TransactionStatus;

/**
 * Listener Container that allows batch consumption of messages. Works only with transacted sessions
 */
public class BatchMessageListenerContainer extends DefaultMessageListenerContainer 
  public static final int DEFAULT_BATCH_SIZE = 20;

  private int batchSize = DEFAULT_BATCH_SIZE;

  public BatchMessageListenerContainer() 
    super();
    setSessionTransacted(true);
  

  /**
   * @return The batch size on this container
   */
  public int getBatchSize() 
    return batchSize;
  

  /**
   * @param batchSize The batchSize of this container
   */
  public void setBatchSize(int batchSize) 
    this.batchSize = batchSize;
  

  /**
   * The doReceiveAndExecute() method has to be overriden to support multiple-message receives.
   */
  @Override
  protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
    TransactionStatus status) throws JMSException 

    Connection conToClose = null;
    MessageConsumer consumerToClose = null;
    Session sessionToClose = null;

    try 
      Session sessionToUse = session;
      MessageConsumer consumerToUse = consumer;

      if (sessionToUse == null) 
        Connection conToUse = null;
        if (sharedConnectionEnabled()) 
          conToUse = getSharedConnection();
        
        else 
          conToUse = createConnection();
          conToClose = conToUse;
          conToUse.start();
        
        sessionToUse = createSession(conToUse);
        sessionToClose = sessionToUse;
      

      if (consumerToUse == null) 
        consumerToUse = createListenerConsumer(sessionToUse);
        consumerToClose = consumerToUse;
      

      List<Message> messages = new ArrayList<Message>();

      int count = 0;
      Message message = null;
      // Attempt to receive messages with the consumer
      do 
        message = receiveMessage(consumerToUse);
        if (message != null) 
          messages.add(message);
        
      
      // Exit loop if no message was received in the time out specified, or
      // if the max batch size was met
      while ((message != null) && (++count < batchSize));

      if (messages.size() > 0) 
        // Only if messages were collected, notify the listener to consume the same.
        try 
          doExecuteListener(sessionToUse, messages);
          sessionToUse.commit();
        
        catch (Throwable ex) 
          handleListenerException(ex);
          if (ex instanceof JMSException) 
            throw (JMSException) ex;
          
        
        return true;
      

      // No message was received for the period of the timeout, return false.
      noMessageReceived(invoker, sessionToUse);
      return false;
    
    finally 
      JmsUtils.closeMessageConsumer(consumerToClose);
      JmsUtils.closeSession(sessionToClose);
      ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
    
  

  protected void doExecuteListener(Session session, List<Message> messages) throws JMSException 
    if (!isAcceptMessagesWhileStopping() && !isRunning()) 
      if (logger.isWarnEnabled()) 
        logger.warn("Rejecting received messages because of the listener container "
          + "having been stopped in the meantime: " + messages);
      
      rollbackIfNecessary(session);
      throw new JMSException("Rejecting received messages as listener container is stopping");
    

    @SuppressWarnings("unchecked")
    SessionAwareBatchMessageListener<Message> lsnr = (SessionAwareBatchMessageListener<Message>) getMessageListener();

    try 
      lsnr.onMessages(session, messages);
    
    catch (JMSException ex) 
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    
    catch (RuntimeException ex) 
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    
    catch (Error err) 
      rollbackOnExceptionIfNecessary(session, err);
      throw err;
    
  

  @Override
  protected void checkMessageListener(Object messageListener) 
    if (!(messageListener instanceof SessionAwareBatchMessageListener<?>)) 
      throw new IllegalArgumentException("Message listener needs to be of type ["
        + SessionAwareBatchMessageListener.class.getName() + "]");
    
  

  @Override
  protected void validateConfiguration() 
    if (batchSize <= 0) 
      throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
    
  

  public void setSessionTransacted(boolean transacted) 
    if (!transacted) 
      throw new IllegalArgumentException("Batch Listener requires a transacted Session");
    
    super.setSessionTransacted(transacted);
  

SessionAwareBatchMessageListener:

package com.mypackage;

import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

public interface SessionAwareBatchMessageListener<M extends Message> 
  /**
   * Perform a batch action with the provided list of @code messages.
   * 
   * @param session JMS @code Session that received the messages
   * @param messages List of messages
   * @throws JMSException JMSException thrown if there is an error performing the operation.
   */
  public void onMessages(Session session, List<M> messages) throws JMSException;

applicationContext.xml 中的 Bean:

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="com.mypackage.BatchMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

MyMessageListner:

package org.mypackage;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements SessionAwareBatchMessageListener<TextMessage> 

    @Autowired
    private MyService myService;

    @Override
    public void onMessage(Session session, List<TextMessage> messages) 
        try 
           for(TextMessage tm :messages)  
                TextMessage textMessage = (TextMessage) message;
               // parse the message and add to list
            
            //process list of Objects to DB
         catch (JMSException e1) 
             e1.printStackTrace();
        
    

【问题讨论】:

当有足够多的消息时,很难说是哪里出了问题。也许您没有预取,receive() 为每条消息往返于代理?当您没有可用的完整批次时,减少 receiveTimeout 应该会有所帮助。 【参考方案1】:

我认为在将消息发送给消费者之前花费的时间是由您的 while 循环引起的,因为您每次都在等待列表已满,但这一个仅由当前线程填充,因为它是在内部创建的doReceiveAndExecute 方法!

  // Exit loop if no message was received in the time out specified, or
  // if the max batch size was met
  while ((message != null) && (++count < batchSize));

也许这可以做得很好:

...
     List<Message> messages = Collections.synchronizedList(new ArrayList<Message>());

     @Override
      protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
        TransactionStatus status) throws JMSException 

        Connection conToClose = null;
        MessageConsumer consumerToClose = null;
        Session sessionToClose = null;

        try 
          Session sessionToUse = session;
          MessageConsumer consumerToUse = consumer;

          if (sessionToUse == null) 
            Connection conToUse = null;
            if (sharedConnectionEnabled()) 
              conToUse = getSharedConnection();
            
            else 
              conToUse = createConnection();
              conToClose = conToUse;
              conToUse.start();
            
            sessionToUse = createSession(conToUse);
            sessionToClose = sessionToUse;
          

          if (consumerToUse == null) 
            consumerToUse = createListenerConsumer(sessionToUse);
            consumerToClose = consumerToUse;
          


          Message message = null;
          // Attempt to receive messages with the consumer
          do 
            message = receiveMessage(consumerToUse);
            if (message != null) 
              messages.add(message);
            
          

          if (messages.size() >= batchSize)) 
            synchronized (messages) 
              // Only if messages were collected, notify the listener to consume the same.
              try 
                doExecuteListener(sessionToUse, messages);
                sessionToUse.commit();
                // clear the list!!
                messages.clear();
              
              catch (Throwable ex) 
                handleListenerException(ex);
                if (ex instanceof JMSException) 
                  throw (JMSException) ex;
                
              
            
            return true;
          

          // No message was received for the period of the timeout, return false.
          noMessageReceived(invoker, sessionToUse);
          return false;
        
        finally 
          JmsUtils.closeMessageConsumer(consumerToClose);
          JmsUtils.closeSession(sessionToClose);
          ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
        
      

【讨论】:

我会试试这个,让你知道它是否有效。 1.我这样认为 sessionToUse.commit();可以由任何线程完成。 2. 知道需要多长时间才能结束吗? sessionToUse.commit();由执行同步块的当前线程完成,另一个线程不能将消息添加到列表中,直到第一个完成并提交。所以这是我检测到的一个问题,提交只在当前线程的会话中进行!另一个线程会话的其他消息将不会被提交,您必须逐个使用单独的确认和提交消息。 ***.com/questions/40399495/…

以上是关于将会话/事务的超时设置为 DefaultMessageListenerContainer的主要内容,如果未能解决你的问题,请参考以下文章

覆盖 WebSphere 设置的事务超时?

如何在 web.config 中设置会话超时

如何在 Hive 中配置会话超时

如何在 Java Web 应用程序中动态设置会话超时?

ASP.NET 窗体身份验证超时和会话超时

使用 paramiko 防止 SFTP/SSH 会话超时