如何尝试在并发环境中重新启动() JMS 连接?

Posted

技术标签:

【中文标题】如何尝试在并发环境中重新启动() JMS 连接?【英文标题】:How can I attempt to restart() JMS connection in concurrent environment? 【发布时间】:2021-03-02 13:44:33 【问题描述】:

我有一个 JMS 应用程序,它试图从 JBosss 队列中读取数据。我在课堂上实现了MessageListener,并使用onMessage() 接收消息

public class JBossConnector implements MessageListener, AutoCloseable 

这是我的方法:

/**
 * The listener method of JMS. It listens to messages from queue: 'jbossToAppia'
 * If the message is of type MessageObject, then transfer that to Appia
 *
 * @param message JMS Message
 */
@Override
public void onMessage(Message message) 

    // receive the message from jboss queue: 'jbossToAppia'
    // then post it to appia
    if (message instanceof ObjectMessage) 
        try 

            MessageObject messageObject = (MessageObject) ((ObjectMessage) message).getObject();
            System.out.printf("JbossConnector: MessageObject received from JBOSS, %s\n", messageObject.getMessageType());

            component.onMessageFromJboss(properties.getProperty("target.sessionID"), messageObject);

         catch (MessageFormatException exception) 
            logger.error(ExceptionHandler.getFormattedException(exception));
            ExceptionHandler.printException(exception);
         catch (JMSException exception) 
            ExceptionHandler.printException(exception);
            restart();
        
     else 
        System.out.printf("%s: MessageFormatException(Message is not of the format MessageObject)\n", this.getClass().getSimpleName());
    

每当我找到JMSException 时,我都会尝试重新启动 JBoss 连接(上下文、连接、会话、接收方、发送方)。我的疑问是我读过onMessage() 使用多个线程从队列接收消息(如果我错了,请纠正我)。

当 JBoss 队列连接断开时,至少会有一些队列抛出此异常。这意味着他们都会尝试restart() 连接,这是浪费时间(restart() 首先关闭所有连接,将变量设置为 null,然后尝试启动连接)。

现在我可以做类似的事情

synchronized (this)
    restart();

或使用volatile 变量。但这并不能保证当当前线程完成restart() 操作时其他线程不会尝试restart()(如果我错了,请再次纠正我)。

有什么办法可以解决这个问题吗?

【问题讨论】:

【参考方案1】:

MessageListeneronMessage() 确实是从它自己的线程中运行的,因此您需要适当的并发控制。我认为最简单的解决方案就是使用java.util.concurrent.atomic.AtomicBoolean。例如,在您的 restart() 方法中,您可以执行以下操作:

private void restart() 
   AtomicBoolean restarting = new AtomicBoolean(false);

   if (!restarting.getAndSet(true)) 
      // restart connection, session, etc.
   

这将使restart() 方法有效地幂等。多个线程将能够调用restart(),但只有第一个调用它的线程实际上会导致资源被重新创建。所有其他调用将立即返回。

【讨论】:

非常感谢。我正要同步所有东西,弄得一团糟

以上是关于如何尝试在并发环境中重新启动() JMS 连接?的主要内容,如果未能解决你的问题,请参考以下文章

在 ActiveMQ 代理不可用时重试建立 JMS 连接

如何确定 JMS 连接是不是存在?

如何在 AUTO_ACKNOWLEDGE JMS 会话场景中模拟消息重新传递?

错误 503 centeron 如何正确重定向

使用 JMS/ActiveMQ 并发同步请求-回复 - 模式/库?

Spring Boot 2 重新部署到 Wildfly 10 后无法刷新 JMS 连接