如果在消费消息后出现任何错误,如何将消息保存在 JMS 消息队列中?

Posted

技术标签:

【中文标题】如果在消费消息后出现任何错误,如何将消息保存在 JMS 消息队列中?【英文标题】:How to hold messages in JMS Message Queue if there are any error after consuming the message? 【发布时间】:2015-07-27 19:10:26 【问题描述】:

我的场景是 - 我将消息发布到队列中,一旦消息被消费,我就会将其发送到第三方中间件应用程序。如果该中间件应用程序已关闭,那么我发布的消息将被扔掉。如果中间件应用程序关闭,我不想丢失该消息,而是希望它保持或在队列中等待。请建议,如何处理这种情况?

【问题讨论】:

来自您的持久消息怎么样 --> 第三方?,您与第三方的确认系统是什么?即你怎么知道他们是否启动了,即使他们启动了他们也没有出错并扔掉你的味精? --你和第三者有什么合同? 我正在使用 RESTful Web 服务合约与第三方应用程序进行交互。如果服务器关闭,它将尝试连接几次,如果失败或连接丢失消息,我们会收到错误错误消息。 是否可以设置死信队列,然后定期将这些消息移回主队列? 【参考方案1】:

您应该像这样创建会话:

Session session = connection.createSession(false,
                       Session.CLIENT_ACKNOWLEDGE);

当您尝试将消息传递到您的第三方应用时:

如果它正常工作,您应该确认该消息。

如果它关闭了,你不应该确认它,这样 JMS 提供者将能够重新传递它,并且消息不会丢失。 message.acknowledge();

另外,你可以看看这个:JMS AUTO_ACKNOWLEDGE when is it acknowledged?

【讨论】:

【参考方案2】:

JMS 队列不是消息存储。

如果您有一条“坏消息”而处理继续失败,那么 JMS 服务器(如果已配置)将不可避免地将该消息转储到“死消息队列”,该队列将慢慢填满,直到另一个进程耗尽它。

您不想将坏消息保留在队列中,因为它们可能会阻塞队列(假设您有 10 个消费者,并且前 10 条消息都是坏消息,所以所有进程所做的就是继续连枷到坏消息消息——停止队列)。

因此,您需要一些机制将消息存储到异常接收器中,然后可以将它们注入到主队列中进行处理。

死消息队列不是这种机制(不要将消息存储在 JMS 队列中),而是一种将异常消息路由到更永久存储区域(即数据库表或其他东西)的机制。

在那里,他们可以被审查(自动、手动等)并重新交付或取消。

但关键是您需要一个外部机制来实现这一点,仅 JMS 服务器并不适合此类消息。

【讨论】:

“不要将消息存储在 JMS 队列中”您能进一步解释一下吗?队列可以是持久化的,为什么不存储在里面呢? 在某些情况下,您需要按严格的顺序处理消息。在这种情况下,只要未成功使用坏消息,它们就会保留在队列中。【参考方案3】:

这可以通过我使用会话确认来实现。为此,首先修改您的生产者代码以使用 Session.AUTO_ACKNOWLEDGE。在创建队列会话时,将 AUTO_ACKNOWLEDGE 设为 false。这意味着消费者必须承认。当消费者发送消息确认后,该消息将从队列中删除,否则将保留在队列中。

下面是生产者代码。

try 
        QueueConnectionFactory qcf = AppUtils.getQueueConnectionFactory(); 
        Queue q = AppUtils.getDestination();
        QueueConnection qConnection = qcf.createQueueConnection();
        QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        QueueSender qSender = qSession.createSender(q);

        qConnection.start();
        TextMessage msg = qSession.createTextMessage("Hello");
        qSender.send(msg);

        qSender.close();
        qConnection.close();

     catch (JMSException e) 
        // log your error to log file
        e.printStackTrace();
    

在消费者方面,您必须做同样的事情,创建一个队列会话,将 AUTO_ACKNOWLEDGE 设置为 false。

处理完您的消息后,您可以发送确认以从队列中删除该消息,否则该消息将保留在队列中。

try 
        QueueConnectionFactory qcf = getQueueConnectionFactory(); 
        Queue q = getDestination();
        QueueConnection qConnection = qcf.createQueueConnection();
        QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        QueueReceiver qReceiver = qSession.createReceiver(q);

        qConnection.start();
        Message msg = qReceiver.receive();

  // here send your message to third party application 
  //if your third party application is down 
        if(thirdpartyapp is down)
          //here you can raise an exception 
          //or just do nothing 
          // you're not sending acknowledgement here so the msg will 
           //remain in the queue
        else      
          msg.acknowledge();//here youre sending ack, so msg will be deleted
          qReceiver.close();
          qConnection.close();
        

     catch (JMSException e) 
        // TODO Auto-generated catch block
        e.printStackTrace();
    

【讨论】:

【参考方案4】:

虽然目前尚不清楚您的消费消息应用程序是如何设计的,但我建议您修改消费者应用程序以在本地事务中消费消息并将消息发布到第三方中间件应用程序。如果发布成功,那么您可以提交将从 JMS 队列中删除消息的事务。如果您的应用程序无法发布消息,您可以简单地回滚事务,这将使消息重新出现在 JMS 队列中。该消息将再次发送。

【讨论】:

你能举个代码例子说明你上面描述的可以实现吗?

以上是关于如果在消费消息后出现任何错误,如何将消息保存在 JMS 消息队列中?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 AMQP 中停止消费消息一段时间(时间不固定)?

Kafka - 如何在使用高级消费者的每条消息后提交偏移量?

如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?

一条消息怎么被两个消费者消费

系统学习消息队列分享(七) 如何处理消费过程中的重复消息?

Spring Cloud Stream如何消费自己生产的消息