为啥 RegisterMessageHandler 不适用于特定主题名称?

Posted

技术标签:

【中文标题】为啥 RegisterMessageHandler 不适用于特定主题名称?【英文标题】:Why does RegisterMessageHandler not work for a specific topic name?为什么 RegisterMessageHandler 不适用于特定主题名称? 【发布时间】:2020-06-21 03:08:39 【问题描述】:

我不明白为什么下面引用的以下处理程序 (processMessageAsync) 没有被特定主题名称触发,但其他主题名称成功:

subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)

以下是我的订阅者类:

open System
open System.Linq
open System.Threading
open System.Text
open System.Threading.Tasks
open Microsoft.Azure.ServiceBus

type Subscriber(connectionString:string, topic:string, subscription:string) =

    let mutable subscriptionClient : SubscriptionClient = null

    let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =
        printfn "Got an exception: %A" args.Exception
        Task.CompletedTask

    let processMessageAsync (message:Message) (_:CancellationToken) = 

        try

            let _ = Encoding.UTF8.GetString(message.Body)
            subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously

            Task.CompletedTask

        with
            _ -> Task.CompletedTask

    member x.Listen() =

        async 

            subscriptionClient <- new SubscriptionClient(connectionString, topic, subscription)
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(3.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
            msgOptions.AutoComplete         <- false
            msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
            msgOptions.MaxConcurrentCalls   <- 1

            subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)
        

    member x.CloseAsync() =

        async 

            do! subscriptionClient.CloseAsync() |> Async.AwaitTask
        

这是我尝试运行 订阅者的方式:

open System
open Subscription.Console

let connectionString = <connection_string>

[<EntryPoint>]
let main argv =

    printfn "Welcome to Subscription.Console"

    let topic,subscription = "Topic.courier-accepted","Subscription.all-messages"
    let subscriber = Subscriber(connectionString, topic, subscription)

    async  do! subscriber.Listen()
           |> Async.RunSynchronously

    Console.ReadKey() |> ignore

    async  do! subscriber.CloseAsync()
           |> Async.RunSynchronously

    0 // return an integer exit code 

以下代码发布我的订阅者应该收到(但没有)的消息:

[<Fact>]
let ``Publish courier-accepted to servicebus``() =

    async 

        // Setup
        let  client    = TopicClient(sbConnectionstring, "Topic.courier-accepted")
        let! requestId = requestId()

        let updated = requestId |> modifyRequestId someCourierResponse
        let json    = JsonConvert.SerializeObject(updated)
        let message = Message(Encoding.UTF8.GetBytes(json))

        message.Label <- sprintf "request-id(%s)" (requestId.ToString())

        // Test
        do! client.SendAsync(message) |> Async.AwaitTask

        // Teardown
        do! client.CloseAsync()       |> Async.AwaitTask
    

注意:

上面代码的有趣之处在于,当我运行 Azure Function 并将 ServiceBusTrigger 设置为相同的主题和订阅名称时,每次运行测试时都会触发 Azure Function。

我没有收到任何异常消息 exceptionReceivedHandler 函数永远不会在我的订阅者实例上触发 我没有在我的 Azure 仪表板上观察到 servicebus 资源的任何用户错误

使用不同的主题名称成功

如果我将主题名称更改为“courier-requested”,那么订阅者实例会收到消息:

[<Fact>]
let ``Publish courier-requested to servicebus topic``() =

    // Setup
    let client    = TopicClient(sbConnectionstring, "Topic.courier-requested")
    let message   = Message(Encoding.UTF8.GetBytes(JsonFor.courierRequest))
    message.Label <- sprintf "courier-id(%s)" "b965f552-31a4-4644-a9c6-d86dd45314c4"

    // Test
    async 

        do! client.SendAsync(message) |> Async.AwaitTask
        do! client.CloseAsync()       |> Async.AwaitTask
    

这里是主题名称调整的订阅:

[<EntryPoint>]
let main argv =

    printfn "Welcome to Subscription.Console"

    let topic,subscription = "Topic.courier-requested","Subscription.all-messages"
    let subscriber = Subscriber(connectionString, topic, subscription)

    async  do! subscriber.Listen()
           |> Async.RunSynchronously

    Console.ReadKey() |> ignore

    async  do! subscriber.CloseAsync()
           |> Async.RunSynchronously

    0 // return an integer exit code

这是我的 Azure 门户中的两个主题:

在 Portal 中点击主题有不同的结果:

我注意到我必须点击两次“接受快递”才能查看它的订阅。但是,我可以单击“快递请求”一次并立即查看其订阅。

【问题讨论】:

I don't receive any exception messages,也许是因为你正在接受异常?我在 try 之后看到了 with _ 我为 courier-accepted 主题创建了一个附加订阅,启动了订阅值与我刚刚在门户中注册的订阅值匹配的订阅者,然后删​​除了最近创建的订阅后确实观察到了一个异常当订阅者仍在运行时。 我无法重现您的问题,您似乎遇到了服务总线门户上的异常行为。也许您可以向 SB 团队提交支持票。 我昨天提交了一张票。 【参考方案1】:

如果我理解正确,您尝试删除有问题的主题并重新创建它,这听起来像是 Azure 中的一个小问题。您不应该得到上述需要单击两次的行为。有时我在 Azure 中创建了一些东西,但他们的基础设施下游某处出现问题,而支持请求是解决问题的唯一方法。

【讨论】:

以上是关于为啥 RegisterMessageHandler 不适用于特定主题名称?的主要内容,如果未能解决你的问题,请参考以下文章

为啥使用 glTranslatef?为啥不直接更改渲染坐标?

为啥 DataGridView 上的 DoubleBuffered 属性默认为 false,为啥它受到保护?

为啥需要softmax函数?为啥不简单归一化?

为啥 g++ 需要 libstdc++.a?为啥不是默认值?

为啥或为啥不在 C++ 中使用 memset? [关闭]

为啥临时变量需要更改数组元素以及为啥需要在最后取消设置?