我是不是正确实施了 ActiveMQ?实现交易会话并重试

Posted

技术标签:

【中文标题】我是不是正确实施了 ActiveMQ?实现交易会话并重试【英文标题】:Am I implementing ActiveMQ correctly? To achieve a transacted session and retry我是否正确实施了 ActiveMQ?实现交易会话并重试 【发布时间】:2016-01-20 22:51:35 【问题描述】:

我正在尝试提出一个 JMS-ActiveMQ 实现,它支持使用事务处理会话进行回滚。 我对 ActiveMQ 非常陌生,我已经使用它的 Java 库首次尝试了实现。

当我运行我的应用程序时,我看到消息已成功入队和出队。我还可以看到自动生成了对应的DLQ。但是,我不确定我是否正确配置了 redeliverypolicy。截至目前,它已在生产者上配置,但有些examples 将重新传递策略与侦听器容器联系起来,所以我不完全确定中毒消息是否会放在 DLQ 上,在我的情况下(如果有的话)。详细的 cmets 在 sn-ps 中。

此外,到目前为止,我遇到的所有示例都使用 Spring。但是,我可以选择使用它,这需要重新连接整个项目(如果它只涉及最小的开销,我会打开)。

任何关于我如何使用 ActiveMQ api 在 Java 中做到这一点的见解将不胜感激。

制片人

public void publishUpdate(final MessageBody payload)
            throws JMSException 
        Session session = session(connection());
        try 
            Message message = message(session, payload);
            LOGGER.info("About to put message on queue");
            producer(session).send(message);
            // without session.commit()-- no messages get put on the queue.
            session.commit();// messages seem to be enqueued now.
            
         catch ( BadRequestException e)  //to avoid badly formed requests?
            LOGGER.info("Badly formed request. Not attempting retry!");
            return;
         catch (JMSException jmsExcpetion) 

            LOGGER.info("Caught JMSException will retry");
            session.rollback();// assume rollback is followed by a retry?
                 
    

  private MessageProducer producer(Session session) throws JMSException 
        return session.createProducer(destination());
    

   private Connection connection() throws JMSException 
        ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory();
        Connection connection = connectionFactory.createConnection();
     connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());//redelivery policy with three retries and redelivery time of 1000ms  
        return connection;
    

 private Session session(Connection connection) throws JMSException 
        Session session = connection.createSession(true,
                Session.SESSION_TRANSACTED);
        connection.start();
        return session;
     

听众:

public class UpdateMessageListener implements MessageListener
….
    public void onMessage(Message message) 
        String json = null;
        try 
            //Does the listener need to do anything to deal with retry?
            json = ((TextMessage) message).getText();
            MessageBody request = SerializeUtils.deserialize(json, MessageBody.class);
            processTransaction(request.getUpdateMessageBody(), headers);//perform some additional processing.
             catch (Throwable e) 
            LOGGER.error("Error processing request: ", json);
        
    

消费者:

  private MessageConsumer consumer() throws JMSException 
        LOGGER.info("Creating consumer");
            MessageConsumer consumer = session().createConsumer(destination());
            consumer.setMessageListener(new UpdateMessageListener()); //wire listener to consumer
        return consumer;
    
    private Session session() throws JMSException 
        Connection connection=connection();
         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//create an auto-ack  from the consumer side? Is this correct?
         connection.start();
         return session;
  

如有必要,我也愿意提供更多代码。

【问题讨论】:

【参考方案1】:

您的解决方案有一点缺陷。

根据JMS document,如果onMessage函数中存在异常,执行失败的消息将由Session.AUTO_ACKNOWLEDGE模式下的消息队列(如ActiveMQ)重新传递。但是这个流程被打破了,因为监听器在 onMessage 函数中捕获了 Throwable 或 Exception。数据流如下图所示:

ACK_TYPE

如果你想实现本地事务,当消息处理程序执行失败时应该抛出异常。在伪代码中,异步重新传递消息消费者可能如下所示:

Session session = connection.getSession(consumerId);  
sessionQueueBuffer.enqueue(message);  
Runnable runnable = new Ruannale()  
    run()  
        Consumer consumer = session.getConsumer(consumerId);  
        Message md = sessionQueueBuffer.dequeue();  
        try  
            consumer.messageListener.onMessage(md);  
            ack(md);//send an STANDARD_ACK_TYPE, tell broker success
        catch(Exception e)  
            redelivery();//send redelivery ack. DELIVERED_ACK_TYPE, require broker keep message and redeliver it.
      
   
threadPool.execute(runnable);

【讨论】:

【参考方案2】:

    生产端事务通常仅用作分布式事务的一部分。如果此发布者是源,则无需承担开销。

    您无法使用 MessageListener b/c 进行事务处理,因为您没有会话句柄。你必须回退到一个receive() 循环。签出.. 使用 ActiveMQSession 获得 INDIVIDUAL_MESSAGE 确认模式,以获得漂亮的每消息事务模式。

【讨论】:

以上是关于我是不是正确实施了 ActiveMQ?实现交易会话并重试的主要内容,如果未能解决你的问题,请参考以下文章

如何拒绝消息

.Net NMS.ActiveMQ 我应该存储消息发送调用之间的会话和连接吗

我是不是正确实施了我的 DAO?

我是不是正确实施了 SCD 类型 1 和 7

SbDrmServerCertificateUpdatedFunc 是不是正确实施?

正确配置 ActiveMQ 以避免 Producer 内存泄漏