如何让我的 ActiveMQ 代理删除离线持久订阅者
Posted
技术标签:
【中文标题】如何让我的 ActiveMQ 代理删除离线持久订阅者【英文标题】:How do I make my ActiveMQ broker drop offline durable subscribers 【发布时间】:2017-03-24 14:19:34 【问题描述】:我们有一个 ActiveMQ 代理,它使用 JMS、AMQP 和 MQTT 从非常不同的客户端连接。出于某种原因,我们还没有弄清楚一组特定的 MQTT 客户端经常(并非总是)持久订阅。这是一个测试环境,客户端经常被添加和删除,后者有时通过拔掉插头或重新启动嵌入式设备,这样他们就无法正确取消订阅。效果 (IIUC) 是代理为可能永远不会再看到的设备堆积“离线持久订阅”(我可以在http://my_broker:8161/admin/subscribers.jsp 下看到这些),永远保留关于这些主题的消息,直到它最终崩溃自己内存占用。
这里的问题是订阅者持续订阅,我们需要找出为什么会这样。然而,我们也决定这样做的客户(无意中)不应该让经纪人陷入停顿,所以我们需要独立解决这个问题。
我找到了there are settings for a timeout for offline durable subscriptions 并将它们放入我们的代理配置中(最后两行):
<broker
xmlns="http://activemq.apache.org/schema/core"
brokerName="my_broker"
dataDirectory="$activemq.data"
useJmx="true"
advisorySupport="false"
persistent="false"
offlineDurableSubscriberTimeout="1800000"
offlineDurableSubscriberTaskSchedule="60000">
如果我理解正确,上述内容应每分钟检查一次,并解雇半小时未见的客户。但是,与文档相反,这似乎不起作用:我订阅的消费者几天前拔掉插头仍然在离线持久订阅者列表中可见,代理的内存占用不断增加,如果我在代理的 Web 界面中手动删除订阅者 我可以看到内存占用减少了。
所以这是我的问题:
-
什么决定了对 ActiveMQ 代理上主题的 MQTT 订阅是否持久?
我在 ActiveMQ 设置中设置删除离线持久订阅的超时时做错了什么?
【问题讨论】:
您是否尝试过相反的方法,即发布具有较短 TTL 周期(生存时间)的消息并配置较短的 expireMessagesPeriod?根据文档,使用此配置,系统必须在 TTL 期限到期后清除所有此类消息,这对于长期丢失的持久订阅者(未取消订阅)无关紧要。这也应该有助于我们释放内存资源,因为实际消耗的内存是用于存储“消息”,而不是用于存储订阅者对象本身。 【参考方案1】:我提取了删除超时持久订阅的相关代码 (doCleanup()
)。
在成功的情况下,它执行:
LOG.info("Destroying durable subscriber due to inactivity: ", sub);
在失败的情况下,它执行:
LOG.error("Failed to remove inactive durable subscriber", e);
在您的日志文件中查找上述日志行,并将其与您使用 admin/subscribers.jsp 查看器观察到的详细信息进行匹配。如果它不打印任何行,则订阅可能出于某种原因仍保留为active
,或者您可能偶然发现了一个错误。
另外,如果可以,您能否尝试删除代理名称中的下划线 (_)?该手册讨论了代理名称中的下划线问题。
代码:
public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory)
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1)
this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
this.cleanupTask = new TimerTask()
@Override
public void run()
doCleanup();
;
this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
public void doCleanup()
long now = System.currentTimeMillis();
for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet())
DurableTopicSubscription sub = entry.getValue();
if (!sub.isActive())
long offline = sub.getOfflineTimestamp();
if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout())
LOG.info("Destroying durable subscriber due to inactivity: ", sub);
try
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(entry.getKey().getClientId());
info.setSubscriptionName(entry.getKey().getSubscriptionName());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(entry.getKey().getClientId());
removeSubscription(context, info);
catch (Exception e)
LOG.error("Failed to remove inactive durable subscriber", e);
// The toString method for DurableTopicSubscription class
@Override
public synchronized String toString()
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
【讨论】:
感谢您回答这个问题。我们的专业支持选项出现了两个事实:1) ActiveMQ 中确实存在一个错误,它无法删除离线持久订阅者。 2) 为了从 MQTT 进行非持久订阅,您需要将cleanSession
设置为 true
并使用 QoS
以上是关于如何让我的 ActiveMQ 代理删除离线持久订阅者的主要内容,如果未能解决你的问题,请参考以下文章