如何在 ASP.NET Core 应用程序中持续监听 Pub/Sub 消息?

Posted

技术标签:

【中文标题】如何在 ASP.NET Core 应用程序中持续监听 Pub/Sub 消息?【英文标题】:How to listen for Pub/Sub messages in an ASP.NET Core app continuously? 【发布时间】:2018-03-29 03:23:10 【问题描述】:

我想实现一个 ASP.NET Core API,它不响应 HTTP 请求,但在启动时开始侦听 Google Cloud Pub/Sub 消息,并且在整个生命周期中无限期地侦听。

使用官方 Pub/Sub SDK 实现此功能的首选方式是什么?

我可以想到两种方法:

方法1:只需使用SimpleSubscriber,并在Startup.Configure 开始收听消息:

public void Configure(IApplicationBuilder app)

    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
    var receivedMessages = new List<PubsubMessage>();

    simpleSubscriber.StartAsync((msg, cancellationToken) =>
    
        // Process the message here.

        return Task.FromResult(SimpleSubscriber.Reply.Ack);
    );

    ...

方法 2:使用专门创建的库来定期运行作业,例如 Quartz、Hangfire 或 FluentScheduler,每次触发作业时,使用 SubscriberClient 拉取新消息。

哪种方法是首选方法?第一个看起来比较简单,但我不确定它是否真的可靠。

【问题讨论】:

@Flater 这是一个在 Kubernetes 中运行的 ASP.NET Core 应用程序。我希望该应用程序同时具有一些 REST 端点,继续收听一些 Pub/Sub 消息。 (我知道我可以将这两件事分成两个部分,但如果可能的话,为了方便起见,我想将其保留为一个。) 我会从最简单的方法开始,然后如果需要,转移到图书馆。在您的示例中,我只会添加代码以将 simpleSubscriber 保留在静态字段中的某个位置,以保护对象免受 GC 据我知道,第一种方法应该没问题 - 但我正在咨询一位了解更多的同事。 【参考方案1】:

第一种方法肯定是打算如何使用它。

但是,请参阅StartAsync 的文档:

开始接收消息。返回的Task 在以下任一情况下完成 StopAsync(CancellationToken) 被调用或 如果不可恢复 发生故障。此方法不能多次调用 SubscriberClient 实例。

因此,您确实需要处理因不可恢复错误而导致的意外 StartAsync 关机。最简单的做法是使用外部循环,尽管考虑到这些错误被认为是不可恢复的,因此可能需要更改调用的某些内容才能成功。

代码可能如下所示:

while (true)

    // Each SubscriberClientinstance must only be used once.
    var subscriberClient = await SubscriberClient.CreateAsync(subscriptionName);
    try
    
        await subscriberClient.StartAsync((msg, cancellationToken) =>
        
            // Process the message here.
            return Task.FromResult(SimpleSubscriber.Reply.Ack);
        );
    
    catch (Exception e)
    
        // Handle the unrecoverable error somehow...
    

如果这没有按预期工作,请let us know。

编辑SimpleSubscriber 在库中已重命名为 SubscriberClient,因此已对答案进行了相应编辑。

【讨论】:

感谢您的信息!通过不可恢复的错误,您的意思是堆栈溢出或内存不足,因此它不会由于暂时的网络故障或类似的事情而停止,对吗?顺便提一句。监听是如何实现的,SimpleSubscriber 是在内部轮询,还是 Pub/Sub 支持 WebSocket 连接之类的东西? @MarkVincze “不可恢复”表示不可恢复的 RPC 错误。 可恢复 RPC 错误列表在这里:github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/… 所有其他错误都是不可恢复的。 请注意,SimpleSubscriber 已重命名为 SubscriberClient。新的 GitHub 链接:github.com/googleapis/google-cloud-dotnet/blob/master/apis/…

以上是关于如何在 ASP.NET Core 应用程序中持续监听 Pub/Sub 消息?的主要内容,如果未能解决你的问题,请参考以下文章

ASP.NET Core MVC:设置身份 cookie 过期

Asp.Net Core 中IdentityServer4 实战之 Claim详解

ASP.NET Core中的缓存[1]:如何在一个ASP.NET Core应用中使用缓存

Docker打包 Asp.Net Core应用,在CentOS上运行

如何在 ASP.Net Core 中使用 IHostedService

如何使用 EF Core 在 ASP.NET Core 中取消应用迁移