ActiveMQ 重新传递不起作用

Posted

技术标签:

【中文标题】ActiveMQ 重新传递不起作用【英文标题】:ActiveMQ redelivery does not work 【发布时间】:2017-05-02 03:29:15 【问题描述】:

我正在尝试使用 ActiveMQ 实现死信队列。不幸的是,这方面的文档在某些方面相当模糊,我似乎无法正确设置所有内容。

我配置了以下 Bean:

@Bean
public JmsTemplate createJMSTemplate() 
    logger.info("createJMSTemplate");
    JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory());
    jmsTemplate.setDefaultDestinationName(queue);
    jmsTemplate.setDeliveryPersistent(true);
    jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    return jmsTemplate;


@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(getActiveMQConnectionFactory());
    factory.setConcurrency("1-10");
    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;


@Bean
public ConnectionFactory getActiveMQConnectionFactory() 
    // Configure the ActiveMQConnectionFactory
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
    activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company"));

    // Configure the redeliver policy and the dead letter queue
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setRedeliveryDelay(10000);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(3);
    RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
    redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

    return activeMQConnectionFactory;

这是我的接收代码:

@Autowired
private ConnectionFactory connectionFactory;

private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
private Connection connection;
private Session session;
private SegmentReceiver callback;

@PostConstruct
private void init() throws JMSException, InterruptedException 
    logger.info("Initializing QueueReceiver...");
    this.connection = connectionFactory.createConnection();
    this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Queue q = session.createQueue(queue);
    logger.info("Creating consumer for queue ''", q.getQueueName());
    MessageConsumer consumer = session.createConsumer(q);
    this.callback = new SegmentReceiver();
    consumer.setMessageListener(callback);
    this.connection.start();


@PreDestroy
private void destroy() throws JMSException 
    logger.info("Destroying QueueReceiver...");
    this.session.close();
    this.connection.close();


private class SegmentReceiver implements MessageListener 

    @Override
    public void onMessage(Message message) 
        logger.info("onMessage");
        try 
            TextMessage textMessage = (TextMessage) message;
            Segment segment = Segment.fromJSON(textMessage.getText());
            if (segment.shouldFail()) 
                throw new IOException("This segment is expected to fail");
            
            System.out.println(segment.getText());
            message.acknowledge();
        
        catch(IOException | JMSException exception) 
            logger.error(exception.toString());
            try 
                QueueReceiver.this.session.rollback();
             catch (JMSException e) 
                logger.error(e.toString());
            
            throw new RuntimeException(exception);
        
    


但是,什么也没有发生。我正在使用默认配置的开箱即用 Apache ActiveMQ 5.14.2 设置。我在这里想念什么?

【问题讨论】:

【参考方案1】:

因为你正在使用 this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); 调用message.acknowledge(); 与调用session.acknowledge(); 相同。

要使 ActiveMQ 重新交付成功地与您的配置一起工作,有一些可能性需要进行最小的更改:

    致电QueueReceiver.this.session.recover(); 代替打电话QueueReceiver.this.session.rollback();

void org.apache.activemq.ActiveMQSession.recover() 抛出 JMSException

在此会话中停止消息传递,并重新开始消息传递 带有最早的未确认消息。

所有消费者都按顺序传递消息。承认一个 收到的消息会自动确认所有已收到的消息 已交付给客户。

重新启动会话会导致它执行以下操作: •停止 消息传递 •标记所有可能已传递的消息 但未确认为“重新交付” •重新启动交付顺序 包括以前所有未确认的消息 发表。重新传递的消息不必准确传递 他们的原始交货订单。

    使用 this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 并打电话 ((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge(); ,注意不调用这个方法就像一个回滚,意味着消息没有被确认并且在onMessage 方法中抛出异常将调用 org.apache.activemq.ActiveMQMessageConsumer.rollback() 的QueueReceiver.this.consumer.rollback();

    只需调用QueueReceiver.this.consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback() 代替调用QueueReceiver.this.session.rollback();

【讨论】:

我似乎在使用ActiveMQSession.INDIVIDUAL_ACKNOWLEDGEthis.session.rollback() 创建会话方面取得了一些成功。消息仅在应用程序停止时添加到 DLQ,但它似乎无限次重试消息,而不是我配置的 3 次重新传递。【参考方案2】:

所以事实证明这是一个问题的组合:

会话确认模式需要设置为ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE 我使用的是session.recover() 而不是rollback()

ActiveMQ 代理配置不正确。我需要将此位添加到 activemq.xml 配置文件中(将其放在<broker> 标记下)。

    <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry topic=">" >
            <!-- The constantPendingMessageLimitStrategy is used to prevent
                 slow topic consumers to block producers and affect other consumers
                 by limiting the number of messages that are retained
                 For more information, see:

                 http://activemq.apache.org/slow-consumer-handling.html

            -->
          <pendingMessageLimitStrategy> 
            <constantPendingMessageLimitStrategy limit="1000"/>
          </pendingMessageLimitStrategy>
        </policyEntry>
        <!-- Set the following policy on all queues using the '>' wildcard -->
        <policyEntry queue=">">
            <deadLetterStrategy>
                <!--
                  Use the prefix 'DLQ.' for the destination name, and make
                  the DLQ a queue rather than a topic
                -->
                <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
            </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
</destinationPolicy>

确保您没有激活任何可能与您的 ActiveMQConnectionFactory 配置混淆的redeliveryPlugin

【讨论】:

以上是关于ActiveMQ 重新传递不起作用的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ 经典到 ActiveMQ Artemis 故障转移不起作用

具有接收器线程配置文件配置的 Mule ActiveMQ JMS 组件不起作用

在 sock.js 上使用 stomp.js 和 ActiveMQ-Apollo 似乎不起作用

TableViewCell 中的动态高度不起作用

传递凭据不起作用

在两个视图控制器之间传递数据不起作用