避免 JMS/ActiveMQ 上的重复消息
Posted
技术标签:
【中文标题】避免 JMS/ActiveMQ 上的重复消息【英文标题】:Avoiding duplicated messages on JMS/ActiveMQ 【发布时间】:2011-06-23 11:49:36 【问题描述】:有没有办法抑制 ActiveMQ 服务器上定义的队列上的重复消息?
我尝试手动定义 JMSMessageID,(message.setJMSMessageID("uniqueid")),但服务器忽略了此修改并使用内置生成的 JMSMessageID 传递消息。
按照规范,我没有找到有关如何删除重复消息的参考。
在 HornetQ 中,为了解决这个问题,我们需要在消息定义中声明 HQ 特定的属性 org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。
即:
Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);
有人知道 ActiveMQ 是否有类似的解决方案?
【问题讨论】:
【参考方案1】:你应该看看 Apache Camel,它提供了一个可以与任何 JMS 提供者一起工作的幂等消费者组件,请参阅:http://camel.apache.org/idempotent-consumer.html
将它与 ActiveMQ 组件结合使用使得 JMS 的使用变得非常简单,请参阅: http://camel.apache.org/activemq.html
【讨论】:
我怀疑这种方法是否能解决我的问题。在此实例在队列中期间,我只需要保留一个具有相同 JMSMessageID 的消息实例。我需要它作为一个集合工作。在从队列中删除最新的同上元素后,我希望能够使用相同的 JMSMessageID 放置其他消息。我需要实现它并进行测试。但是,基于 EAI 书中描述的 Idempotent,我认为这个概念与我的必要性不符。但是,建议的解决方案很好。我将对此进行更多研究并在这里评论我的结果。谢谢【参考方案2】:我怀疑 ActiveMQ 是否原生支持它,但实现幂等消费者应该很容易。一种方法是在生产者端为每条消息添加一个唯一标识符,现在在消费者端使用存储(数据库、缓存等),可以检查该消息之前是否已收到,并且根据该检查继续处理。
我看到了之前的 *** 问题 - Apache ActiveMQ 5.3 - How to configure a queue to reject duplicate messages? ,这也可能有所帮助。
【讨论】:
由于消费者本身可以是多线程的,因此要识别其是否重复,必须实现分布式/内存锁定。对吗?【参考方案3】:现在支持删除嵌入到 ActiveMQ 传输中的重复消息。查看Connection Configuration Guide中的配置值auditDepth
和auditMaximumProducerNumber
。
【讨论】:
您如何实际配置这些参数以避免重复? @Thomas 我不确定你在问什么。一般来说,如何在 ActiveMQ 中应用配置?或者为这些特定字段使用什么值? 只是从参数的描述来看,我听的不是很清楚。auditDepth
例如,那里的值是指消息的 Nb 还是将被屏蔽以进行重复的字节的 nb?关于auditMaximumProducerNumber
,这是否意味着将筛选的生产者数量有限?顺便说一句,如果一条内容相同的消息由 2 个不同的订阅者发布,那么这条消息是否会被认为是重复的?
@Chris IIUC 这些参数保证检测到多达 64 个生产者的 2048 条消息的重复项。但是 ActiveMQ 如何确定什么是重复的呢?如果它是由 JMSMessageID 设置的,那么我们将回到第一方,因为我们无法设置它。【参考方案4】:
有一种方法可以让 ActiveMQ 根据 JMS 属性过滤重复项。它涉及编写一个 Activemq Plugin。将重复消息发送到死信队列的基本代理过滤器将是这样的
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;
public class DuplicateFilterBroker extends BrokerFilter
String messagePropertyName;
boolean switchValue;
public DuplicateFilterBroker(Broker next, String messagePropertyName)
super(next);
this.messagePropertyName = messagePropertyName;
public boolean hasDuplicate(String propertyValue)
switchValue = propertyValue;
return switchValue;
public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception
ActiveMQMessage amqmsg = (ActiveMQMessage)msg;
Object msgObj = msg.getMessage();
if (msgObj instanceof javax.jms.Message)
javax.jms.Message jmsMsg = (javax.jms.Message) msgObj;
if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName)))
super.send(producerExchange, msg);
else
sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
【讨论】:
这个插件如何决定使用哪个属性来进一步过滤重复消息,关于用例的解释对于集成这样的插件非常有帮助。提前感谢您的回答。【参考方案5】:似乎问题中建议的方式也适用于 ActiveMQ(2016/12)。请参阅activemq-artemis 指南。这需要生产者在消息中设置特定属性。
Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);
但是包含该属性的类是不同的:
org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID
,属性值为_AMQ_DUPL_ID
。
【讨论】:
以上是关于避免 JMS/ActiveMQ 上的重复消息的主要内容,如果未能解决你的问题,请参考以下文章