如何在 AUTO_ACKNOWLEDGE JMS 会话场景中模拟消息重新传递?
Posted
技术标签:
【中文标题】如何在 AUTO_ACKNOWLEDGE JMS 会话场景中模拟消息重新传递?【英文标题】:How to simulate message redelivery in AUTO_ACKNOWLEDGE JMS Session Scenario? 【发布时间】:2012-04-09 21:27:40 【问题描述】:在以下测试中,我试图模拟以下场景:
-
消息队列已启动。
设计为在消息处理期间失败的使用者已启动。
产生了一条消息。
消费者开始处理消息。
在处理过程中会抛出异常以模拟消息处理失败。失败的消费者被停止。
启动另一个消费者以获取重新传递的消息。
但是我的测试失败了,消息没有重新传递给新的消费者。我会很感激这方面的任何提示。
MessageProcessingFailureAndReprocessingTest.java
@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests
@Autowired
private FailureReprocessTestScenario testScenario;
@Before
public void setUp()
testScenario.start();
@After
public void tearDown() throws Exception
testScenario.stop();
@Test public void
should_reprocess_task_after_processing_failure()
try
Thread.sleep(20*1000);
assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]
"task-1",
)));
catch (InterruptedException e)
fail();
@Configurable
public static class FailureReprocessTestScenario
@Autowired
public BrokerService broker;
@Autowired
public MockTaskProducer mockTaskProducer;
@Autowired
public FailingWorker failingWorker;
@Autowired
public SucceedingWorker succeedingWorker;
@Autowired
public TaskScheduler scheduler;
public void start()
Date now = new Date();
scheduler.schedule(new Runnable()
public void run() failingWorker.start();
, now);
Date after1Seconds = new Date(now.getTime() + 1*1000);
scheduler.schedule(new Runnable()
public void run() mockTaskProducer.produceTask();
, after1Seconds);
Date after2Seconds = new Date(now.getTime() + 2*1000);
scheduler.schedule(new Runnable()
public void run()
failingWorker.stop();
succeedingWorker.start();
, after2Seconds);
public void stop() throws Exception
succeedingWorker.stop();
broker.stop();
@Configuration
@ImportResource(value="classpath:applicationContext-jms.xml",
"classpath:applicationContext-task.xml")
public static class ContextConfig
@Autowired
private ConnectionFactory jmsFactory;
@Bean
public FailureReprocessTestScenario testScenario()
return new FailureReprocessTestScenario();
@Bean
public MockTaskProducer mockTaskProducer()
return new MockTaskProducer();
@Bean
public FailingWorker failingWorker()
TaskListener listener = new TaskListener();
FailingWorker worker = new FailingWorker(listenerContainer(listener));
listener.setProcessor(worker);
return worker;
@Bean
public SucceedingWorker succeedingWorker()
TaskListener listener = new TaskListener();
SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
listener.setProcessor(worker);
return worker;
private DefaultMessageListenerContainer listenerContainer(TaskListener listener)
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConnectionFactory(jmsFactory);
listenerContainer.setDestinationName("tasksQueue");
listenerContainer.setMessageListener(listener);
listenerContainer.setAutoStartup(false);
listenerContainer.initialize();
return listenerContainer;
public static class FailingWorker implements TaskProcessor
private Logger LOG = Logger.getLogger(FailingWorker.class.getName());
private final DefaultMessageListenerContainer listenerContainer;
public FailingWorker(DefaultMessageListenerContainer listenerContainer)
this.listenerContainer = listenerContainer;
public void start()
LOG.info("FailingWorker.start()");
listenerContainer.start();
public void stop()
LOG.info("FailingWorker.stop()");
listenerContainer.stop();
@Override
public void processTask(Object task)
LOG.info("FailingWorker.processTask(" + task + ")");
try
Thread.sleep(1*1000);
throw Throwables.propagate(new Exception("Simulate task processing failure"));
catch (InterruptedException e)
LOG.log(Level.SEVERE, "Unexpected interruption exception");
public static class SucceedingWorker implements TaskProcessor
private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());
private final DefaultMessageListenerContainer listenerContainer;
public final List<String> processedTasks;
public SucceedingWorker(DefaultMessageListenerContainer listenerContainer)
this.listenerContainer = listenerContainer;
this.processedTasks = new ArrayList<String>();
public void start()
LOG.info("SucceedingWorker.start()");
listenerContainer.start();
public void stop()
LOG.info("SucceedingWorker.stop()");
listenerContainer.stop();
@Override
public void processTask(Object task)
LOG.info("SucceedingWorker.processTask(" + task + ")");
try
TextMessage taskText = (TextMessage) task;
processedTasks.add(taskText.getText());
catch (JMSException e)
LOG.log(Level.SEVERE, "Unexpected exception during task processing");
TaskListener.java
public class TaskListener implements MessageListener
private TaskProcessor processor;
@Override
public void onMessage(Message message)
processor.processTask(message);
public void setProcessor(TaskProcessor processor)
this.processor = processor;
MockTaskProducer.java
@Configurable
public class MockTaskProducer implements ApplicationContextAware
private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());
@Autowired
private JmsTemplate jmsTemplate;
private Destination destination;
private int taskCounter = 0;
public void produceTask()
LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");
taskCounter++;
jmsTemplate.send(destination, new MessageCreator()
@Override
public Message createMessage(Session session) throws JMSException
TextMessage message = session.createTextMessage("task-" + taskCounter);
return message;
);
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException
destination = applicationContext.getBean("tasksQueue", Destination.class);
【问题讨论】:
当我设置listenerContainer.setSessionTransacted(true)
时,我看到消息被重新发送,但只发送到FailingWorker
。停止相应监听容器后的事件,SucceedingWorker
永远不会收到重新传递的消息。
我似乎 listenerContainer.stop()
-method 没有关闭与提供的连接,因此 JMS 提供者继续尝试将失败的消息重新传递回同一消费者。为了避免失败的消费者应该在某个时候调用listenerContainer.shutdown()
。
【参考方案1】:
显然,我昨天查看的文档来源Creating Robust JMS Applications 在某种程度上误导了我(或者我可能理解错误)。尤其是那段摘录:
在 JMS 消息被确认之前,它不被认为是 成功消费。成功消费一条消息 通常分三个阶段进行。
客户端收到消息。 客户端处理消息。 消息已确认。确认由 JMS 提供者或客户端发起,具体取决于会话 确认模式。
我假设 AUTO_ACKNOWLEDGE 正是这样做的 - 在侦听器方法返回结果后确认消息。但是根据 JMS 规范,它有点不同,并且 Spring 侦听器容器不会像预期的那样尝试改变 JMS 规范的行为。这就是 AbstractMessageListenerContainer 的 javadoc 必须说的 - 我已经强调了重要的句子:
侦听器容器提供以下消息确认 选项:
“sessionAcknowledgeMode”设置为“AUTO_ACKNOWLEDGE”(默认):在监听器执行前自动确认消息;在抛出异常的情况下不重新投递。 “sessionAcknowledgeMode”设置为“CLIENT_ACKNOWLEDGE”:监听执行成功后自动消息确认;不 在抛出异常的情况下重新发送。 “sessionAcknowledgeMode”设置为“DUPS_OK_ACKNOWLEDGE”:在监听器执行期间或之后延迟消息确认;潜在的 在抛出异常的情况下重新发送。 “sessionTransacted”设置为“true”:监听器执行成功后的事务确认;在抛出异常的情况下保证重新交付。
所以我的解决方案的关键是listenerContainer.setSessionTransacted(true);
我遇到的另一个问题是 JMS 提供者不断将失败的消息重新传递回在处理消息期间失败的同一消费者。我不知道 JMS 规范是否给出了提供者在这种情况下应该做什么的规定,但对我有用的是使用listenerContainer.shutdown();
来断开失败的消费者并允许提供者重新传递消息并给出另一个消费者的机会。
【讨论】:
以上是关于如何在 AUTO_ACKNOWLEDGE JMS 会话场景中模拟消息重新传递?的主要内容,如果未能解决你的问题,请参考以下文章
JMS createSession(false, Session.AUTO_ACKNOWLEDGE); 两个参数不同组合下的区别