Azure 事件中心 - 如何在 .Net Core WebAPI 中实现消费者?

Posted

技术标签:

【中文标题】Azure 事件中心 - 如何在 .Net Core WebAPI 中实现消费者?【英文标题】:Azure Event Hubs - How to implement a consumer in .Net Core WebAPI? 【发布时间】:2020-09-02 12:55:00 【问题描述】:

因此,我需要在 WebAPI (.Net core 3.1) 应用程序中实现消费者,阅读 Microsoft 文档并观看有关它的几个视频,我得到了这个解决方案。

这是 IServiceCollection 的扩展方法,我从 Startup.cs 调用它来实例化我的 Consumer(连接字符串和容器名称仅用于测试):

    private static async Task AddPropostaEventHub(this IServiceCollection services)
    
        const string eventHubName = "EVENT HUB NAME";
        const string ehubNamespaceConnectionString = "EVENT HUB CONNECTION STRING";
        const string blobContainerName = "BLOB CONTAINER NAME";
        const string blobStorageConnectionString = "BLOB CONNECTION STRING";

        string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
        BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
        EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);

        processor.ProcessEventAsync += ProcessEvent.ProcessEventHandler;
        processor.ProcessErrorAsync += ProcessEvent.ProcessErrorHandler;

        await processor.StartProcessingAsync();
    

ProcessorEventHandler 类:

public static class ProcessEvent

    public static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    
        var result = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());

        //DO STUFF
        await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
    

    public static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    
        //DO STUFF
        return Task.CompletedTask;
    

这段代码可以运行,但我的问题是:这样实现它可以吗?如果消费者从不停止,会有问题吗?它可以阻止我的代码中的其他任务(或请求)吗?

有没有更好的方法在 .Net Core 中使用 Dependecy Injection 来实现它?

我找不到任何人在 WebApi 中实现的示例,这是有原因的吗?

【问题讨论】:

您能否帮助我从业务环境中更好地理解您要解决的方案?正如您所指出的,处理器通常是您希望持续运行的东西,而不是在每个请求的瞬态容量中运行,这将使托管作为 Web 应用程序的一部分不是理想的选择。 @JesseSquire,场景就像你描述的那样。有人说在WebAPI中实现作为中间件消费者(比如ServiceBus可以运行),但我不知道这是否是实现它的最佳方式,或者我是否应该在我的Api之外创建一个处理器工作。 如果不更深入地了解应用程序上下文,就很难概括推荐。你想对这些事件做什么?它们对您的应用程序/业务意味着什么?阅读它们的时机有多敏感? @JesseSquire 我需要捕获事件并将它们发送到传统的 WebService (SOAP),基本上。这个过程非常基础,但对我们的业务非常重要。由于与 StreamSets 框架的集成,我们使用 EventHub。但我认为我们可以使用 ServiceBus。 无论您使用的是事件中心还是服务总线,WebAPI 都绝对不是正确的主机。无论哪种情况,您都需要一个平台,您可以在该平台上持久运行并在代理公开事件/消息时处理它们,或者您可以按计划运行,检查消息,读取直到代理为空,然后回去睡觉。是否可以使用服务进程来托管或 Azure 函数?是什么让您考虑为此使用 Web 应用程序? 【参考方案1】:

正如 Jesse Squire 所提到的,WebAPI 不一定是正确的实施方法,但这主要取决于您的目标是什么。

如果您要创建的 API 还包括事件中心侦听器,则应在 IHostedService interface 下实现它。您现有的 AddPropostaEventHub() 方法位于接口的 StartAsync(CancellationToken cancellationToken) 中,这是 .NET Core 用来启动后台任务的方法。然后,在 Startup.cs 中,将处理程序注册为 services.AddHostedService<EventHubService>();。这可确保正确处理长时间运行的侦听器,而不会阻塞传入的 HTTP 请求。

如果您还没有涉及 API 或能够完全拆分流程,那么您应该考虑将其创建为控制台应用程序而不是托管服务,这进一步分离了 API 和事件侦听器的角色。

你没有提到你在哪里部署它,但如果你碰巧部署到 Azure 应用服务,你确实有一些选项可以将接收器从你的 API 中分离出来,在这种情况下,我肯定会建议你这样做所以。在 App Services 内部,有一个名为 WebJobs 的功能专门用于处理此类事情,而不会将其作为 API 的一部分。一个不错的选择是Functions。在这种情况下,您根本不必担心为 Event Hub 设置 DI,主机进程会为您处理。

【讨论】:

我通过使用 excuteAsync 方法扩展 BackgroundService 来使用相同的方法。但是我的处理程序仅通过 https 呼叫接听电话。它不是在我的 webapi 项目中独立运行,因为它在控制台应用程序中工作。在我的情况下,我的 api 向事件中心发送了一些事件,我已经在我的后台服务中实现了它的使用者,但是这个使用者阻止了 https api 请求,因为它在 api 的同步中运行。困惑于解决这个问题。

以上是关于Azure 事件中心 - 如何在 .Net Core WebAPI 中实现消费者?的主要内容,如果未能解决你的问题,请参考以下文章

如何捕获来自事件中心的错误 json 记录到 azure 流分析

Azure 事件中心事件属性如何序列化?

Azure 事件中心偏移

使用 Python Qpid/Proton/Messenger(),如何过滤来自 Azure 事件中心的消息?

如何有效地将压缩的 json 数据推送到 azure 事件中心并在 azure 流分析中处理?

Azure 事件中心的 HTTPS 端点