从 JMS MessageListener 发出回滚信号

Posted

技术标签:

【中文标题】从 JMS MessageListener 发出回滚信号【英文标题】:Signal a rollback from a JMS MessageListener 【发布时间】:2011-11-05 01:24:06 【问题描述】:

我一直在使用 JMS 和 ActiveMQ。一切都在创造奇迹。我没有使用spring,我也不会。

接口javax.jms.MessageListener只有一种方法onMessage。在实现中,有可能会引发异常。如果实际上引发了异常,那么我说该消息未正确处理,需要重新尝试。所以,我需要 ActiveMQ 等待一会儿,然后重试。即我需要抛出的异常来回滚 JMS 事务。

我怎样才能完成这样的行为?

也许 ActiveMQ 中有一些配置我找不到。

或者...也许可以取消向消费者注册MessageListeners 并自己使用消息,在一个循环中,例如:

while (true) 
    // ... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try 
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
     catch (Exception e) 
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    
    // ... some more administrative stuff

在几个线程中,而不是注册监听器。

或者...我可以以某种方式装饰/AOP/字节操作MessageListeners 来自己做这件事。

你会走哪条路,为什么?

注意:我无法完全控制MessageListeners 代码。

编辑 概念验证测试:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception 
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() 

        @Override
        public void onMessage(Message message) 
            TextMessage textMessage = (TextMessage) message;
            try 
                switch (atomicInteger.getAndIncrement()) 
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        message.acknowledge();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        message.acknowledge();
                
             catch (JMSException e) 
                e.printStackTrace(System.out);
            
        
    );
    connection.start();

    
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    brokerService.stop();

    assertEquals(3, atomicInteger.get());

【问题讨论】:

非常感谢whaley 和@Ammar 的回答。我都赞成,因为你们都指出我走上了正确的道路。但还没有选择正确的答案。因为需要更多的测试。 【参考方案1】:

如果你想使用 SESSION_TRANSACTED 作为你的确认模式,那么你需要设置一个RedeliveryPolicy on your Connection/ConnectionFactory。 This page on ActiveMQ's website 还包含一些您可能需要做的有用信息。

由于您没有使用 Spring,您可以使用类似于以下代码(取自上述链接之一)的代码设置 RedeliveryPolicy:

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

编辑 将您的代码 sn-p 添加到答案中,以下显示了它如何与事务一起使用。尝试使用注释掉 Session.rollback() 方法的代码,您会发现使用 SESION_TRANSACTED 和 Session.commit/rollback 可以按预期工作:

@Test
public void test() throws Exception 
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() 

        public void onMessage(Message message) 
            TextMessage textMessage = (TextMessage) message;
            try 
                switch (atomicInteger.getAndIncrement()) 
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        session.commit();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        session.rollback();
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        session.commit();
                
             catch (JMSException e) 
                e.printStackTrace(System.out);
            
        
    );
    connection.start();

    
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    assertEquals(3, atomicInteger.get());

【讨论】:

那行不通。但为我指明了正确的方向。我将离开 DUPS_OK_ACKNOWLEDGE,因为它似乎是我必须工作最少的工作。 您需要粘贴整个代码,因为您没有正确使用 Session。 DUPS_OK_ACKNOWLEDGE 似乎只是在工作,因为客户端确认是惰性的,并且代理将继续重新发送消息,直到客户端最终确认。 我粘贴了一个概念证明。我只能让它与 DUPS_OK_ACKNOWLEDGE 一起工作,而 message.acknowledgement 似乎没有什么不同。 非常感谢。是的,我会尝试一下 legacylistener.onmessage; session.commit; catch (Throwable t) session.rollback; 当我注册监听器时。【参考方案2】:

您需要将确认模式设置为Session.CLIENT_ACKNOWLEDGE,客户端通过调用消息的确认方法来确认已消费的消息。

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

然后,处理完消息后需要调用 Message.acknowledge() 方法才能删除该消息。

Message message = ...;
// Processing message

message.acknowledge();

【讨论】:

它不起作用。 onMessage 仍然会被调用一次,即使 message.acknowledge() 永远不会被调用。 您是否正确设置了确认方式?它必须设置为 Session.CLIENT_ACKNOWLEDGE ! 但它适用于 (false, Session.DUPS_OK_ACKNOWLEDGE)... message.acknowledge() 似乎不起作用。 如果他使用 CLIENT_ACKNOWLEDGE 那么他需要使用 Session.recover(),而不是 Session.rollback()。【参考方案3】:

有点晚了,但这里是-

我不会使用MessageListener,而是使用全局池来管理监听和处理。

ListeningPool -> 监听器 -> 提交处理任务 -> ProcessingPool -> 执行并确认或关闭而不确认。

    维护 2 个线程池,一个用于侦听器,一个用于处理器。 有一个监听 Runnable 实现,它在 while true 循环中监听队列,以及 consumer.receive(timeout) 方法。在 finally 块中,如果没有收到消息,则关闭连接、会话和使用者。如果收到消息,则向处理池提交一个包含所有 conn、会话、消息和消费者参数的任务。 有一个接收消息、连接、会话和消费者的处理实现。进行处理并确认一切正常。如果不是,请在不确认的情况下关闭。这将根据您的服务器的重新交付政策触发重新交付。 使用所有侦听器任务初始化您的侦听池,侦听消息,每个用于一个队列。使用应用运行时可接受的参数初始化处理池。
public class CustomMessageListener implements Runnable 

    private ConnectionFactory connectionFactory;
    private MessageProcessor processor;
    private long backOff;
    private boolean stopped = false;
    private Executor processPool;

    public CustomMessageListener(ConnectionFactory connectionFactory,
        long backOff, MessageProcessor processor, Executor processPool) 
        this.connectionFactory = connectionFactory;
        this.backOff = backOff;
        this.processor = processor;
        this.processPool = processPool;
    

    @Override
    public void run() 
        while (!stopped) 
            listen();
        
    

    public void stop() 
        this.stopped = true;
    

    public void listen() 
        Connection c = null;
        Session s = null;
        MessageConsumer consumer = null;
        boolean received = false;
        try 
            c = connectionFactory.createConnection();
            s = c.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            consumer = s.createConsumer(...);
            Message message = consumer.receive(backOff); // waits maximum backOff ms for a message
            if (message != null) 
                received = true;
                // submit a task to processing pool...
                executor.submit(processor.process(message, s, consumer, c));
            
         catch (JMSException ex) 
            // log your exception
         finally 
            if (!received) 
                // close conn, session, consumer
            
        
    


public class MessageProcessor 

    public Runnable process(Message msg, Session s, MessageConsumer consumer, Connection conn) 
        return () - > 
            try 
                //do your processing
                msg.acknowledge(); // done
             catch (JMSException ex) 
                // log your exception
             finally 
                // close your resources
            
        ;
    

您可以致电stop() 停止收听更多消息,以便正常关机。在构造函数中包含queueName 以侦听特定队列。

【讨论】:

经验告诉我,Listener 不是要走的路。我同意你的基本前提【参考方案4】:

如果您的会话已被处理,那么“acknowledgeMode”无论如何都会被忽略。所以,只需让您的会话进行处理并使用 session.rollback 和 session.commit 来提交或回滚您的事务。

【讨论】:

我认为(我的)问题是在 MessageListener.onMessage(Message) 中无法访问会话。

以上是关于从 JMS MessageListener 发出回滚信号的主要内容,如果未能解决你的问题,请参考以下文章

JMS 是不是有队列窥视的概念?

如何尝试在并发环境中重新启动() JMS 连接?

activeMq 消费者整合spring

AWS SQS JMS确认

Spring整合JMS——消息监听器

Spring----监听器容器