将会话/事务的超时设置为 DefaultMessageListenerContainer
Posted
技术标签:
【中文标题】将会话/事务的超时设置为 DefaultMessageListenerContainer【英文标题】:Set timeOut for session/transaction to DefaultMessageListenerContainer 【发布时间】:2019-09-09 01:14:58 【问题描述】:我想使用 link 中提到的 Spring 的 DefaultMessageListenerConatiner(通过覆盖 doReceiveAndExecute)在某个 timeLimit(例如:收到第一条消息后 300 毫秒)内从队列接收大量消息。
当队列中有太多消息时,我可以将批量大小的消息分组,即 20 条,而当队列中的消息非常少时,我可以接收不到 20 条消息。
问题:
我发现即使队列已满,将消息发送到侦听器也需要太多时间(有时 1 秒,有时 2 秒甚至更多)。
当我尝试使用 DefaultMessageListenerConatiner 同时接收单个消息时,我看到消息的接收延迟为几毫秒(如 1 毫秒或最多 30 到 60 毫秒)
我没有指定 transactionTimeout 或 receiveTimeout,也没有链接任何 transactionManager。
Springers 能否帮我找到可以指定 timeOut 的位置或如何减少时间延迟?
BatchMessageListenerContainer:
package com.mypackage;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.util.ArrayList;
import java.util.List;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.support.JmsUtils;
import org.springframework.transaction.TransactionStatus;
/**
* Listener Container that allows batch consumption of messages. Works only with transacted sessions
*/
public class BatchMessageListenerContainer extends DefaultMessageListenerContainer
public static final int DEFAULT_BATCH_SIZE = 20;
private int batchSize = DEFAULT_BATCH_SIZE;
public BatchMessageListenerContainer()
super();
setSessionTransacted(true);
/**
* @return The batch size on this container
*/
public int getBatchSize()
return batchSize;
/**
* @param batchSize The batchSize of this container
*/
public void setBatchSize(int batchSize)
this.batchSize = batchSize;
/**
* The doReceiveAndExecute() method has to be overriden to support multiple-message receives.
*/
@Override
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
TransactionStatus status) throws JMSException
Connection conToClose = null;
MessageConsumer consumerToClose = null;
Session sessionToClose = null;
try
Session sessionToUse = session;
MessageConsumer consumerToUse = consumer;
if (sessionToUse == null)
Connection conToUse = null;
if (sharedConnectionEnabled())
conToUse = getSharedConnection();
else
conToUse = createConnection();
conToClose = conToUse;
conToUse.start();
sessionToUse = createSession(conToUse);
sessionToClose = sessionToUse;
if (consumerToUse == null)
consumerToUse = createListenerConsumer(sessionToUse);
consumerToClose = consumerToUse;
List<Message> messages = new ArrayList<Message>();
int count = 0;
Message message = null;
// Attempt to receive messages with the consumer
do
message = receiveMessage(consumerToUse);
if (message != null)
messages.add(message);
// Exit loop if no message was received in the time out specified, or
// if the max batch size was met
while ((message != null) && (++count < batchSize));
if (messages.size() > 0)
// Only if messages were collected, notify the listener to consume the same.
try
doExecuteListener(sessionToUse, messages);
sessionToUse.commit();
catch (Throwable ex)
handleListenerException(ex);
if (ex instanceof JMSException)
throw (JMSException) ex;
return true;
// No message was received for the period of the timeout, return false.
noMessageReceived(invoker, sessionToUse);
return false;
finally
JmsUtils.closeMessageConsumer(consumerToClose);
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
protected void doExecuteListener(Session session, List<Message> messages) throws JMSException
if (!isAcceptMessagesWhileStopping() && !isRunning())
if (logger.isWarnEnabled())
logger.warn("Rejecting received messages because of the listener container "
+ "having been stopped in the meantime: " + messages);
rollbackIfNecessary(session);
throw new JMSException("Rejecting received messages as listener container is stopping");
@SuppressWarnings("unchecked")
SessionAwareBatchMessageListener<Message> lsnr = (SessionAwareBatchMessageListener<Message>) getMessageListener();
try
lsnr.onMessages(session, messages);
catch (JMSException ex)
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
catch (RuntimeException ex)
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
catch (Error err)
rollbackOnExceptionIfNecessary(session, err);
throw err;
@Override
protected void checkMessageListener(Object messageListener)
if (!(messageListener instanceof SessionAwareBatchMessageListener<?>))
throw new IllegalArgumentException("Message listener needs to be of type ["
+ SessionAwareBatchMessageListener.class.getName() + "]");
@Override
protected void validateConfiguration()
if (batchSize <= 0)
throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
public void setSessionTransacted(boolean transacted)
if (!transacted)
throw new IllegalArgumentException("Batch Listener requires a transacted Session");
super.setSessionTransacted(transacted);
SessionAwareBatchMessageListener:
package com.mypackage;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
public interface SessionAwareBatchMessageListener<M extends Message>
/**
* Perform a batch action with the provided list of @code messages.
*
* @param session JMS @code Session that received the messages
* @param messages List of messages
* @throws JMSException JMSException thrown if there is an error performing the operation.
*/
public void onMessages(Session session, List<M> messages) throws JMSException;
applicationContext.xml 中的 Bean:
<bean id="myMessageListener" class="org.mypackage.MyMessageListener">
<bean id="jmsContainer" class="com.mypackage.BatchMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" ref="queue"/>
<property name="messageListener" ref="myMessageListener"/>
<property name ="concurrentConsumers" value ="10"/>
<property name ="maxConcurrentConsumers" value ="50"/>
</bean>
MyMessageListner:
package org.mypackage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.mypackage.service.MyService;
public class MyMessageListener implements SessionAwareBatchMessageListener<TextMessage>
@Autowired
private MyService myService;
@Override
public void onMessage(Session session, List<TextMessage> messages)
try
for(TextMessage tm :messages)
TextMessage textMessage = (TextMessage) message;
// parse the message and add to list
//process list of Objects to DB
catch (JMSException e1)
e1.printStackTrace();
【问题讨论】:
当有足够多的消息时,很难说是哪里出了问题。也许您没有预取,receive()
为每条消息往返于代理?当您没有可用的完整批次时,减少 receiveTimeout
应该会有所帮助。
【参考方案1】:
我认为在将消息发送给消费者之前花费的时间是由您的 while 循环引起的,因为您每次都在等待列表已满,但这一个仅由当前线程填充,因为它是在内部创建的doReceiveAndExecute 方法!
// Exit loop if no message was received in the time out specified, or // if the max batch size was met while ((message != null) && (++count < batchSize));
也许这可以做得很好:
...
List<Message> messages = Collections.synchronizedList(new ArrayList<Message>());
@Override
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
TransactionStatus status) throws JMSException
Connection conToClose = null;
MessageConsumer consumerToClose = null;
Session sessionToClose = null;
try
Session sessionToUse = session;
MessageConsumer consumerToUse = consumer;
if (sessionToUse == null)
Connection conToUse = null;
if (sharedConnectionEnabled())
conToUse = getSharedConnection();
else
conToUse = createConnection();
conToClose = conToUse;
conToUse.start();
sessionToUse = createSession(conToUse);
sessionToClose = sessionToUse;
if (consumerToUse == null)
consumerToUse = createListenerConsumer(sessionToUse);
consumerToClose = consumerToUse;
Message message = null;
// Attempt to receive messages with the consumer
do
message = receiveMessage(consumerToUse);
if (message != null)
messages.add(message);
if (messages.size() >= batchSize))
synchronized (messages)
// Only if messages were collected, notify the listener to consume the same.
try
doExecuteListener(sessionToUse, messages);
sessionToUse.commit();
// clear the list!!
messages.clear();
catch (Throwable ex)
handleListenerException(ex);
if (ex instanceof JMSException)
throw (JMSException) ex;
return true;
// No message was received for the period of the timeout, return false.
noMessageReceived(invoker, sessionToUse);
return false;
finally
JmsUtils.closeMessageConsumer(consumerToClose);
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
【讨论】:
我会试试这个,让你知道它是否有效。 1.我这样认为 sessionToUse.commit();可以由任何线程完成。 2. 知道需要多长时间才能结束吗? sessionToUse.commit();由执行同步块的当前线程完成,另一个线程不能将消息添加到列表中,直到第一个完成并提交。所以这是我检测到的一个问题,提交只在当前线程的会话中进行!另一个线程会话的其他消息将不会被提交,您必须逐个使用单独的确认和提交消息。 ***.com/questions/40399495/…以上是关于将会话/事务的超时设置为 DefaultMessageListenerContainer的主要内容,如果未能解决你的问题,请参考以下文章