避免 JMS/ActiveMQ 上的重复消息

Posted

技术标签:

【中文标题】避免 JMS/ActiveMQ 上的重复消息【英文标题】:Avoiding duplicated messages on JMS/ActiveMQ 【发布时间】:2011-06-23 11:49:36 【问题描述】:

有没有办法抑制 ActiveMQ 服务器上定义的队列上的重复消息?

我尝试手动定义 JMSMessageID,(message.setJMSMessageID("uniqueid")),但服务器忽略了此修改并使用内置生成的 JMSMessageID 传递消息。

按照规范,我没有找到有关如何删除重复消息的参考。

在 HornetQ 中,为了解决这个问题,我们需要在消息定义中声明 HQ 特定的属性 org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。

即:

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

有人知道 ActiveMQ 是否有类似的解决方案?

【问题讨论】:

【参考方案1】:

你应该看看 Apache Camel,它提供了一个可以与任何 JMS 提供者一起工作的幂等消费者组件,请参阅:http://camel.apache.org/idempotent-consumer.html

将它与 ActiveMQ 组件结合使用使得 JMS 的使用变得非常简单,请参阅: http://camel.apache.org/activemq.html

【讨论】:

我怀疑这种方法是否能解决我的问题。在此实例在队列中期间,我只需要保留一个具有相同 JMSMessageID 的消息实例。我需要它作为一个集合工作。在从队列中删除最新的同上元素后,我希望能够使用相同的 JMSMessageID 放置其他消息。我需要实现它并进行测试。但是,基于 EAI 书中描述的 Idempotent,我认为这个概念与我的必要性不符。但是,建议的解决方案很好。我将对此进行更多研究并在这里评论我的结果。谢谢【参考方案2】:

我怀疑 ActiveMQ 是否原生支持它,但实现幂等消费者应该很容易。一种方法是在生产者端为每条消息添加一个唯一标识符,现在在消费者端使用存储(数据库、缓存等),可以检查该消息之前是否已收到,并且根据该检查继续处理。

我看到了之前的 *** 问题 - Apache ActiveMQ 5.3 - How to configure a queue to reject duplicate messages? ,这也可能有所帮助。

【讨论】:

由于消费者本身可以是多线程的,因此要识别其是否重复,必须实现分布式/内存锁定。对吗?【参考方案3】:

现在支持删除嵌入到 ActiveMQ 传输中的重复消息。查看Connection Configuration Guide中的配置值auditDepthauditMaximumProducerNumber

【讨论】:

您如何实际配置这些参数以避免重复? @Thomas 我不确定你在问什么。一般来说,如何在 ActiveMQ 中应用配置?或者为这些特定字段使用什么值? 只是从参数的描述来看,我听的不是很清楚。 auditDepth 例如,那里的值是指消息的 Nb 还是将被屏蔽以进行重复的字节的 nb?关于auditMaximumProducerNumber,这是否意味着将筛选的生产者数量有限?顺便说一句,如果一条内容相同的消息由 2 个不同的订阅者发布,那么这条消息是否会被认为是重复的? @Chris IIUC 这些参数保证检测到多达 64 个生产者的 2048 条消息的重复项。但是 ActiveMQ 如何确定什么是重复的呢?如果它是由 JMSMessageID 设置的,那么我们将回到第一方,因为我们无法设置它。【参考方案4】:

有一种方法可以让 ActiveMQ 根据 JMS 属性过滤重复项。它涉及编写一个 Activemq Plugin。将重复消息发送到死信队列的基本代理过滤器将是这样的

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;

public class DuplicateFilterBroker extends BrokerFilter 
    String messagePropertyName;
    boolean switchValue;

    public DuplicateFilterBroker(Broker next, String messagePropertyName) 
        super(next);
        this.messagePropertyName = messagePropertyName;
    

    public boolean hasDuplicate(String propertyValue)
        switchValue = propertyValue;
        return switchValue;
    

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception  
        ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
        Object msgObj = msg.getMessage(); 
        if (msgObj instanceof javax.jms.Message)  
            javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
            if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) 
                super.send(producerExchange, msg);
            
            else 
               sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
             
        
      

【讨论】:

这个插件如何决定使用哪个属性来进一步过滤重复消息,关于用例的解释对于集成这样的插件非常有帮助。提前感谢您的回答。【参考方案5】:

似乎问题中建议的方式也适用于 ActiveMQ(2016/12)。请参阅activemq-artemis 指南。这需要生产者在消息中设置特定属性。

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id";   // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

但是包含该属性的类是不同的: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID,属性值为_AMQ_DUPL_ID

【讨论】:

以上是关于避免 JMS/ActiveMQ 上的重复消息的主要内容,如果未能解决你的问题,请参考以下文章

JMS + ActiveMQ

Spring JMS(ActiveMQ) 延迟消息传递

深入浅出JMS--ActiveMQ简单介绍以及安装

深入浅出JMS--ActiveMQ简单介绍以及安装

深入浅出JMS--ActiveMQ简单介绍以及安装

深入浅出JMS--ActiveMQ简单的HelloWorld实例