ActiveMQ 批量消费者
Posted
技术标签:
【中文标题】ActiveMQ 批量消费者【英文标题】:ActiveMQ batch consumer 【发布时间】:2021-12-02 10:42:42 【问题描述】:我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有一种方法/配置可以从主题中批量消费消息,而不是逐条读取消息并为每条消息进行数据库调用。
我在想象最终解决方案将执行以下操作:
-
以 100 个批量消耗消息
使用 mongo bulk insert 将批处理保存到 DB 中
向代理发送成功插入消息的 ACK 和失败消息的 NAK。
【问题讨论】:
【参考方案1】:JMS API 仅允许您一次接收一条消息,无论是通过异步 javax.jms.MessageListener
还是通过 JMS 1.1 中的 javax.jms.MessageConsumer#receive()
或 JMS 2 中的 javax.jms.JMSConsumer.receive()
同步调用。但是,您可以批量接收多条消息使用事务处理会话。以下是the javax.jms.Session
JavaDoc 对事务会话的评价:
可以将会话指定为已处理。每个事务会话支持单个系列事务。每个事务将一组消息发送和一组消息接收组合成一个原子工作单元。实际上,事务将会话的输入消息流和输出消息流组织成一系列原子单元。当一个事务提交时,它的原子输入单元被确认并且它的相关原子输出单元被发送。如果事务回滚完成,事务发送的消息将被销毁,会话的输入会自动恢复。
因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这基本上充当否定确认)。例如:
final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true)
List messages = new ArrayList<Message>();
for (int i = 0; i < TX_SIZE; i++)
Message message = consumer.receive(1000);
if (message != null)
messages.add(message);
else
break; // no more messages available for this batch
if (messages.size() > 0)
try
// bulk insert data from messages List into Mongo
session.commit();
catch (Exception e)
e.printStackTrace();
session.rollback();
else
break; // no more messages in the subscription
值得注意的是,如果您只使用 JMS 事务会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 之后但在提交事务会话之前崩溃)。 XA 事务会为您减轻这种风险,但会增加相当多的复杂性,具体取决于您的环境。
最后,如果您遇到 ActiveMQ“经典”的性能限制,请考虑使用来自 ActiveMQ 的下一代消息代理 ActiveMQ Artemis。
【讨论】:
【参考方案2】:@Nabeel Ahmad 你可能有兴趣在 ActiveMQ 中查看Virtual Topics。它们提供了在生产者端使用主题的能力,然后使用队列进行消费。当想要扩展消费时,它们非常有用,因为使用队列比消费者方面的主题拥有更多的功能和可观察性。
将此配置添加到activemq.xml
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VT.>" prefix="VQ.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
然后让生产者发送到:topic://VT.DATA
然后让消费者接收来自:queue://VQ.CLIENT1.VT.DATA
正如@Justin Bertram 所提到的,批量读取可以使用事务处理会话完成,并且每 100 条左右的消息提交一次。
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue("VQ.CLIENT1.VT.DATA");
Message message = null;
long count = 0l;
do
message = messageConsumer.receive(2000l);
if(message != null)
// check the message and publisher.send() to .DLQ if it is bad
// if message is good, send to Mongo
if(count % 100 == 0)
// commit every 100 messages on the JMS-side
session.commit();
while(message != null);
【讨论】:
以上是关于ActiveMQ 批量消费者的主要内容,如果未能解决你的问题,请参考以下文章