当它已经超时时,我们如何避免多个 Rebus 消息?

Posted

技术标签:

【中文标题】当它已经超时时,我们如何避免多个 Rebus 消息?【英文标题】:How can we avoid multiple Rebus messages when its has been timed out? 【发布时间】:2014-08-07 10:41:40 【问题描述】:

我们使用 Rebus 作为 Sql 服务器的队列系统。我们有多个不同类型消息的收件人。每条消息都可以由特定类型的多个工作人员处理。 一条消息只能由一个工人(第一个拉它的工人)处理/处理。如果工作人员由于某种原因无法完成它,它会使用超时服务推迟消息。

如果我理解正确的话,它会变成一个 TimeoutRequest 并放入 超时表。当需要重新运行时,它会在作为原始消息重新引入队列之前成为 TimeoutReply。

我们遇到的问题是,当它变成 TimeoutReply 时,所有工作人员都会将其拾取并创建原始消息。一条原始消息在超时时会变成多条消息(与工人一样多)。

我们的 Rebus 设置如下:

“服务器端”:

        var adapter = new BuiltinContainerAdapter();
        Configure.With(adapter)
            .Logging(l => l.Log4Net())
            .Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
            .CreateBus()
            .Start();

        return adapter;

“工人端”:

        _adapter = new BuiltinContainerAdapter();
        Configure.With(_adapter)
            .Logging(l => l.Log4Net())
            .Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
                .EnsureTableIsCreated())
            .Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
            .Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
            .Behavior(x => x.SetMaxRetriesFor<Exception>(0))
            .Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
            .CreateBus().Start(numberOfWorkers);

非常感谢您在解决问题或提供理解方面的任何帮助!

【问题讨论】:

【参考方案1】:

我能想象你最终会收到多个超时回复的唯一原因是因为每个工作人员都充当超时管理器,而且他们似乎共享相同的存储空间。

这样,由于超时管理器在查询到期超时时不使用任何类型的锁定或任何东西,它们最终可能会抢夺相同的到期超时,这反过来会导致多个超时回复 - 因为存在竞争条件,但它没有被注意到,因为this SQL 没有注意到一行是否被实际删除)。

我建议您 a) 为工作人员使用单独的超时表(例如 _inputQueue + ".timeouts"),或 b) 让所有工作人员使用外部超时管理器(即通过省略 Timeouts(x =&gt; ...) 事物并启动独立的专用超时管理器。

在你的场景中,我猜 (a) 是最简单的方法,因为它非常接近你现在所拥有的。

我更喜欢 (b) 我自己,通常每台托管 Rebus 端点的机器都有一个超时管理器。

如果这能解决您的问题,请告诉我。

另外,我很想知道 SQL 传输是如何为您工作的 :)

【讨论】:

感谢您的宝贵反馈。您描述的场景可能是导致它的原因。我正在研究使用(b)。当我查看配置文件时,似乎我们需要为每个输入队列(收件人类型)设置一个超时管理器 如果你使用专用的超时管理器,你只需要一个 - 如果你不配置超时管理器端点地址,默认为rebus.timeout(假设为本地) 可以使用the configuration section配置超时管理器地址 再次感谢。我报告了一个关于外部超时管理器的错误。希望你有时间研究一下。 github.com/rebus-org/Rebus/issues/241 不是一个错误,而是一个遗漏,或多或少是故意的。我已经详细说明了in my comment

以上是关于当它已经超时时,我们如何避免多个 Rebus 消息?的主要内容,如果未能解决你的问题,请参考以下文章

会话超时时如何自动刷新

Xcode 服务器 - 尝试设置 AX 消息传递超时时超时

Azure Queues dequeue 计数随着 Rebus 核心增长

Tomcat会话超时时怎样记录操作日志,满足安全审计要求

Vuetify 小吃吧超时时更新父状态

AWS Lambda超时时获取通知