ActiveMQ 批量消费者

Posted

技术标签:

【中文标题】ActiveMQ 批量消费者【英文标题】:ActiveMQ batch consumer 【发布时间】:2021-12-02 10:42:42 【问题描述】:

我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有一种方法/配置可以从主题中批量消费消息,而不是逐条读取消息并为每条消息进行数据库调用。

我在想象最终解决方案将执行以下操作:

    以 100 个批量消耗消息 使用 mongo bulk insert 将批处理保存到 DB 中 向代理发送成功插入消息的 ACK 和失败消息的 NAK。

【问题讨论】:

【参考方案1】:

JMS API 仅允许您一次接收一条消息,无论是通过异步 javax.jms.MessageListener 还是通过 JMS 1.1 中的 javax.jms.MessageConsumer#receive() 或 JMS 2 中的 javax.jms.JMSConsumer.receive() 同步调用。但是,您可以批量接收多条消息使用事务处理会话。以下是the javax.jms.Session JavaDoc 对事务会话的评价:

可以将会话指定为已处理。每个事务会话支持单个系列事务。每个事务将一组消息发送和一组消息接收组合成一个原子工作单元。实际上,事务将会话的输入消息流和输出消息流组织成一系列原子单元。当一个事务提交时,它的原子输入单元被确认并且它的相关原子输出单元被发送。如果事务回滚完成,事务发送的消息将被销毁,会话的输入会自动恢复。

因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这基本上充当否定确认)。例如:

final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true) 
   List messages = new ArrayList<Message>();
   for (int i = 0; i < TX_SIZE; i++) 
      Message message = consumer.receive(1000);
      if (message != null) 
         messages.add(message);
       else 
         break; // no more messages available for this batch
      
   

   if (messages.size() > 0) 
      try 
         // bulk insert data from messages List into Mongo
         session.commit();
       catch (Exception e) 
         e.printStackTrace();
         session.rollback();
      
    else 
      break; // no more messages in the subscription
   

值得注意的是,如果您只使用 JMS 事务会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 之后但在提交事务会话之前崩溃)。 XA 事务会为您减轻这种风险,但会增加相当多的复杂性,具体取决于您的环境。

最后,如果您遇到 ActiveMQ“经典”的性能限制,请考虑使用来自 ActiveMQ 的下一代消息代理 ActiveMQ Artemis。

【讨论】:

【参考方案2】:

@Nabeel Ahmad 你可能有兴趣在 ActiveMQ 中查看Virtual Topics。它们提供了在生产者端使用主题的能力,然后使用队列进行消费。当想要扩展消费时,它们非常有用,因为使用队列比消费者方面的主题拥有更多的功能和可观察性。

将此配置添加到activemq.xml

<destinationInterceptors> 
  <virtualDestinationInterceptor> 
    <virtualDestinations> 
      <virtualTopic name="VT.>" prefix="VQ.*." selectorAware="false"/>   
    </virtualDestinations>
  </virtualDestinationInterceptor> 
</destinationInterceptors>

然后让生产者发送到:topic://VT.DATA

然后让消费者接收来自:queue://VQ.CLIENT1.VT.DATA

正如@Justin Bertram 所提到的,批量读取可以使用事务处理会话完成,并且每 100 条左右的消息提交一次。

Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue("VQ.CLIENT1.VT.DATA");

Message message = null;
long count = 0l;
do 
  message = messageConsumer.receive(2000l);
  
  if(message != null) 
     // check the message and publisher.send() to .DLQ if it is bad

     // if message is good, send to Mongo

     if(count % 100 == 0) 
        // commit every 100 messages on the JMS-side
        session.commit();
     
  
 while(message != null);

【讨论】:

以上是关于ActiveMQ 批量消费者的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ:单个生产者,多个消费者

activemq 通配符消费者如何工作?

如何将消息从 Activemq 推送到消费者

ActiveMQ 消费者挂起

activemq消息一直pending

ActiveMQ:使用队列(具有并发消费者)和主题的正确配置