面向.NET开发人员的Dapr——发布和订阅

Posted dotNET跨平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了面向.NET开发人员的Dapr——发布和订阅相关的知识,希望对你有一定的参考价值。

目录:

The Dapr publish & subscribe building block

Dapr 发布 & 订阅构建块

The Publish-Subscribe pattern (often referred to as "pub/sub") is a well-known and widely used messaging pattern. Architects commonly embrace it in distributed applications. However, the plumbing to implement it can be complex. There are often subtle feature differences across different messaging products. Dapr offers a building block that significantly simplifies implementing pub/sub functionality.

发布-订阅模式 (通常称为 "发布/订阅" ) 是众所周知且广泛使用的消息模式。架构师通常在分布式应用程序中采用它。但是,实现可能会很复杂。在不同的消息队列产品中,通常会有细微的功能差异。Dapr 提供了一个构建基块,可显著简化实现发布/订阅功能。

What it solves

解决方法

The primary advantage of the Publish-Subscribe pattern is loose coupling, sometimes referred to as temporal decoupling. The pattern decouples services that send messages (the publishers) from services that consume messages (the subscribers). Both publishers and subscribers are unaware of each other - both are dependent on a centralized message broker that distributes the messages.

发布/订阅 模式的主要优点是松耦合,有时称为时间上解耦。此模式分离服务为发送消息的服务(称为发布者)和消费消息的服务(称为订阅者)。发布者和订阅者都不知道对方的存在,两者都依赖于分发消息的集中式消息代理。

Figure 7-1 shows the high-level architecture of the pub/sub pattern.

图7-1 显示了发布/订阅模式的高层架构。

Figure 7-1. The pub/sub pattern.

图 7-1。发布/订阅模式。

From the previous figure, note the steps of the pattern:

  1. Publishers send messages to the message broker.

  2. Subscribers bind to a subscription on the message broker.

  3. The message broker forwards a copy of the message to interested subscriptions.

  4. Subscribers consume messages from their subscriptions.

从上图中,注意模式的步骤:

  1. 发布者将消息发送到消息代理。

  2. 订阅者绑定到消息代理上的订阅。

  3. 消息代理将消息的副本转发给感兴趣的订阅。

  4. 订阅者从其订阅消费消息。

Most message brokers encapsulate a queueing mechanism that can persist messages once received. With it, the message broker guarantees durability by storing the message. Subscribers don't need to be immediately available or even online when a publisher sends a message. Once available, the subscriber receives and processes the message. Dapr guarantees At-Least-Once semantics for message delivery. Once a message is published, it will be delivered at least once to any interested subscriber.

大多数消息代理封装了一个排队机制,这种机制可以保证在收到消息后可以持久保存消息的。利用它,消息代理通过存储消息来保证 持久性 。当发布者发送消息时,订阅者无需立即就绪,甚至不需要在线。一旦订阅者就绪,订阅者将接收并处理消息。Dapr 为消息传递 提供至少一次的语义保证。发布消息后,该消息将至少传递一次到任何相关订阅者。

If your service can only process a message once, you'll need to provide an idempotency check to ensure that the same message is not processed multiple times. While such logic can be coded, some message brokers, such as Azure Service Bus, provide built-in messaging capabilities.

如果要求服务只能处理一次消息,则需要提供 幂等性检查 ,确保不会多次处理同一消息。虽然可以编码这种逻辑,但某些消息代理(如 Azure Service Bus)提供内置的消息传递 重复检测 功能。

There are several message broker products available - both commercially and open-source. Each has advantages and drawbacks. Your job is to match your system requirements to the appropriate broker. Once selected, it's a best practice to decouple your application from message broker plumbing. You achieve this functionality by wrapping the broker inside an abstraction. The abstraction encapsulates the message plumbing and exposes generic pub/sub operations to your code. Your code communicates with the abstraction, not the actual message broker. While a wise decision, you'll have to write and maintain the abstraction and its underlying implementation. This approach requires custom code that can be complex, repetitive, and error-prone.

有多种可用的消息代理产品-商业和开源。各有优缺点。你需要按系统要求选择相应的代理。选择后,最佳做法是分离应用程序与消息代理系统。可以通过抽象 代理来实现此功能。抽象封装消息代理并向代码公开通用的发布/订阅操作。你的代码与抽象交互,而不是实际的消息代理。明智的决定是,你必须编写和维护抽象及其底层实现。此方法需要自定义代码,这些代码可能会很复杂、重复并且容易出错。

The Dapr publish & subscribe building block provides the messaging abstraction and implementation out-of-the-box. The custom code you would have had to write is prebuilt and encapsulated inside the Dapr building block. You bind to it and consume it. Instead of writing messaging plumbing code, you and your team focus on creating business functionality that adds value to your customers.

Dapr 发布 & 订阅构建块提供开箱即用的消息传递抽象和实现。以前您必须编写的自定义代码已在 Dapr 构建基块内预构建和封装。绑定到并使用这些封装。你和你的团队只需聚焦于创建能为客户带来价值的业务功能上,而无需再编写消息管道代码。

How it works

工作原理

The Dapr publish & subscribe building block provides a platform-agnostic API framework to send and receive messages. Your services publish messages to a named topic. Your services subscribe to a topic to consume messages.

Dapr 发布 & 订阅构建块提供了平台无关的 API 框架来发送和接收消息。你的服务将消息发布到一个命名 主题。你的服务订阅主题来消费消息。

The service calls the pub/sub API on the Dapr sidecar. The sidecar then makes calls into a pre-defined Dapr pub/sub component that encapsulates a specific message broker product. Figure 7-2 shows the Dapr pub/sub messaging stack.

服务在 Dapr 边车上调用 pub/sub API。然后,边车将调用一个预定义的 Dapr pub/sub 组件(封装了特定的消息代理产品)。图7-2 显示了 Dapr pub/sub 消息传递栈。

 

Figure 7-2. The Dapr pub/sub stack.

图 7-2。Dapr pub/sub 栈。

The Dapr publish & subscribe building block can be invoked in many ways.

可以通过多种方式调用 Dapr 发布 & 订阅构建块。

At the lowest level, any programming platform can invoke the building block over HTTP or gRPC using the Dapr native API. To publish a message, you make the following API call:

在最低级别,任何编程平台均可通过 HTTP 或 gRPC 使用 Dapr 本机 API 来调用构建块。若要发布消息,请执行以下 API 调用:

http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>

There are several Dapr specific URL segments in the above call:

  • <dapr-port> provides the port number upon which the Dapr sidecar is listening.

  • <pub-sub-name> provides the name of the selected Dapr pub/sub component.

  • <topic> provides the name of the topic to which the message is published.

以上调用中有几个 Dapr 特定的 URL 段:

  • <dapr-port> 提供 Dapr 边车正在侦听的端口号。

  • <pub-sub-name> 提供选定的 Dapr pub/sub 组件的名称。

  • <topic> 提供消息要发布到的主题的名称。

Using the curl command-line tool to publish a message, you can try it out:

可以尝试一下使用 curl 的命令行工具发布消息:

curl -X POST http://localhost:3500/v1.0/publish/pubsub/newOrder \\
  -H "Content-Type: application/json" \\
  -d '{ "orderId": "1234", "productId": "5678", "amount": 2 }'

You receive messages by subscribing to a topic. At startup, the Dapr runtime will call the application on a well-known endpoint to identify and create the required subscriptions:

通过订阅主题来接收消息。在启动时,Dapr 运行时将调用应用程序上的已知终结点来识别和创建所需的订阅:

http://localhost:<appPort>/dapr/subscribe
  • <appPort> informs the Dapr sidecar of the port upon which the application is listening.

  • <appPort> 通知 Dapr 边车应用程序侦听的端口。

You can implement this endpoint yourself. But Dapr provides more intuitive ways of implementing it. We'll address this functionality later in this chapter.

可以自行实现此终结点。但 Dapr 提供了更直观的实现方法。本章稍后将介绍此功能。

The response from the call contains a list of topics to which the applications will subscribe. Each includes an endpoint to call when the topic receives a message. Here's an example of a response:

对此终结点调用的响应包含应用程序将订阅的主题的列表。每个都包括在主题收到消息时要调用的终结点。下面是响应的示例:

[
  {
    "pubsubname": "pubsub",
    "topic": "newOrder",
    "route": "/orders"
  },
  {
    "pubsubname": "pubsub",
    "topic": "newProduct",
    "route": "/productCatalog/products"
  }
]

In the JSON response, you can see the application wants to subscribe to topics newOrder and newProduct. It registers the endpoints /orders and /productCatalog/products for each, respectively. For both subscriptions, the application is binding to the Dapr component named pubsub.

在 JSON 响应中,可以看到应用程序要订阅主题 newOrder 和 newProduct 。分别为每个终结点注册终结点 /orders和 /productCatalog/products 。对于这两个订阅,应用程序将绑定到名为的 Dapr 组件 pubsub 。

Figure 7-3 presents the flow of the example.

图7-3 显示了该示例的消息流。

Figure 7-3. pub/sub flow with Dapr.

图 7-3。Dapr 的发布/订阅流。

From the previous figure, note the flow:

  1. The Dapr sidecar for Service B calls the /dapr/subscribe endpoint from Service B (the consumer). The service responds with the subscriptions it wants to create.

  2. The Dapr sidecar for Service B creates the requested subscriptions on the message broker.

  3. Service A publishes a message at the /v1.0/publish/<pub-sub-name>/<topic> endpoint on the Dapr Service A sidecar.

  4. The Service A sidecar publishes the message to the message broker.

  5. The message broker sends a copy of the message to the Service B sidecar.

  6. The Service B sidecar calls the endpoint corresponding to the subscription (in this case /orders) on Service B. The service responds with an HTTP status-code 200 OK so the sidecar will consider the message as being handled successfully.

在上图中,请注意以下 消息流:

  1. 服务B的Dapr边车调用服务B(订阅者)的终结点/dapr/subscribe 。服务会返回它要创建的订阅作为响应。

  2. 服务 B 的 Dapr 边车在消息代理上创建要求的订阅。

  3. 服务 A 在其Dapr边车的终结点/v1.0/publish/<pub-sub-name>/<topic> 上发布一条消息。

  4. 服务 A 的Dapr边车将消息发布到消息代理。

  5. 消息代理发送消息副本到服务 B 的Dapr边车。

  6. 服务 B 的Dapr边车调用服务 b 上与订阅对应的终结点(此处为 /orders)。服务以 HTTP 状态码 200 OK 进行响应,边车将认为消息已被成功处理。

In the example, the message is handled successfully. But if something goes wrong while Service B is handling the request, it can use the response to specify what needs to happen with the message. When it returns an HTTP status-code 404, an error is logged and the message is dropped. With any other status-code than 200 or 404, a warning is logged and the message is retried. Alternatively, Service B can explicitly specify what needs to happen with the message by including a JSON payload in the body of the response:

示例中,消息已被成功处理。但是,如果服务 B 处理请求时出现问题,则可以使用响应指定需要对消息执行的操作。当它返回 HTTP 状态码404时 ,将记录错误并丢弃消息。对于任何其他状态码(非200 404),将记录警告,并重试消息。或者,服务 B 可以通过在响应正文中包含 JSON 负荷,显式指定需要对消息执行的操作。

{
  "status": "<status>"
}

The following table shows the available status values:

TABLE 1
StatusAction
SUCCESSThe message is considered as processed successfully and dropped.
RETRYThe message is retried.
DROPA warning is logged and the message is dropped.
Any other statusThe message is retried.

下表显示了 status的可用 值:

表 1
状态操作
SUCCESS消息被视为已成功处理和丢弃。
RETRY重试消息。
DROP将记录警告,并丢弃消息。
任何其他状态重试消息。

Competing consumers

消费者竞争

When scaling out an application that subscribes to a topic, you have to deal with competing consumers. Only one application instance should handle a message sent to the topic. Luckily, Dapr handles that problem. When multiple instances of a service with the same application-id subscribe to a topic, Dapr delivers each message to only one of them.

横向扩展订阅某个主题的应用程序时,必须处理消费者竞争。只有一个应用程序实例应处理发送到主题的消息。幸运的是,Dapr 处理这一问题。当具有相同应用程序 id 的服务的多个实例订阅主题时,Dapr 仅将每条消息传递给其中的一个。

SDKs

Making HTTP calls to the native Dapr APIs is time-consuming and abstract. Your calls are crafted at the HTTP level, and you'll need to handle plumbing concerns such as serialization and HTTP response codes. Fortunately, there's a more intuitive way. Dapr provides several language-specific SDKs for popular development platforms. At the time of this writing, Go, Node.js, Python, .NET, Java, and javascript are available.

对本机 Dapr Api 进行 HTTP 调用非常耗时且更抽象。你的调用是HTTP 级别的,你将需要处理诸如序列化和 HTTP 响应代码这样的管道相关问题。幸运的是,有一种更直观的方式。Dapr 为常用开发平台提供多种语言特定的 Sdk。撰写本文时,可以使用 Node.js、Python、.NET、Java 和 JavaScript。

Use the Dapr .NET SDK

使用 Dapr .NET SDK

For .NET Developers, the Dapr .NET SDK provides a more productive way of working with Dapr. The SDK exposes a DaprClient class through which you can directly invoke Dapr functionality. It's intuitive and easy to use.

对于 .NET 开发人员而言, Dapr .NET SDK 提供了更高效的 Dapr 处理方式。SDK 公开了一个 DaprClient 类,通过该类可以直接调用 Dapr 功能,直观且易于使用。

To publish a message, the DaprClient exposes a PublishEventAsync method.

DaprClient 公开了一个 PublishEventAsync 方法来发布消息 。

var data = new OrderData
{
  orderId = "123456",
  productId = "67890",
  amount = 2
};

var daprClient = new DaprClientBuilder().Build();

await daprClient.PublishEventAsync<OrderData>("pubsub", "newOrder", data);
  • The first argument pubsub is the name of the Dapr component that provides the message broker implementation. We'll address components later in this chapter.

  • The second argument neworder provides the name of the topic to send the message to.

  • The third argument is the payload of the message.

  • You can specify the .NET type of the message using the generic type parameter of the method.

  • 第一个参数 pubsub 是提供消息代理实现的 Dapr 组件的名称。本章稍后将介绍这些组件。

  • 第二个参数 neworder 提供要向其发送消息的主题的名称。

  • 第三个参数是消息的载体。

  • 您可以使用方法的泛型类型参数来指定消息的 .NET 类型。

To receive messages, you bind an endpoint to a subscription for a registered topic. The AspNetCore library for Dapr makes this trivial. Assume, for example, that you have an existing ASP.NET WebAPI action method entitled CreateOrder:

若要接收消息,请将终结点绑定到订阅。用于 Dapr 的 AspNetCore 库使此变得简单。例如,假设你有一个名为 CreateOrder 的 ASP.NET WebAPI 操作方法 :

[HttpPost("/orders")]
public async Task<ActionResult> CreateOrder(Order order)

You must add a reference to the Dapr.AspNetCore NuGet package in your project to consume the Dapr ASP.NET Core integration.

必须在项目中添加对 Dapr.AspNetCore NuGet 包的引用,才能使用 Dapr ASP.NET Core 集成。

To bind this action method to a topic, you decorate it with the Topic attribute:

若要将此操作方法绑定到主题,请使用Topic特性 对其进行修饰 :

[Topic("pubsub", "newOrder")]
[HttpPost("/orders")]
public async Task<ActionResult> CreateOrder(Order order)

You specify two key elements with this attribute:

  • The Dapr pub/sub component to target (in this case pubsub).

  • The topic to subscribe to (in this case newOrder).

指定此特性的两个关键元素:

  • Dapr的发布/订阅组件(此处为pubsub)

  • 订阅的主题 (此处为 newOrder) 。

Dapr then invokes that action method as it receives messages for that topic.

Dapr 将调用该操作方法,以接收该主题的消息。

You'll also need to enable ASP.NET Core to use Dapr. The Dapr .NET SDK provides several extension methods that can be invoked in the Startup class.

还需要启用 ASP.NET Core 来使用 Dapr。Dapr .NET SDK 提供了可在Startup 类中调用的多个扩展方法 。

In the ConfigureServices method, you must add the following extension method:

在 ConfigureServices 方法中,需要添加以下扩展方法:

public void ConfigureServices(IServiceCollection services)
{
    // ...
    services.AddControllers().AddDapr();
}

Appending the AddDapr extension-method to the AddControllers extension-method registers the necessary services to integrate Dapr into the MVC pipeline. It also registers a DaprClient instance into the dependency injection container, which then can be injected anywhere into your service.

将 AddDapr 扩展方法追加到 AddControllers 扩展方法会注册必要的服务,以将 Dapr 集成到 MVC 管道中。它还将 DaprClient 实例注册到依赖关系注入容器,将来可以在任何需要DaprClient的服务中注入它。

In the Configure method, you must add the following middleware components to enable Dapr:

在 Configure 方法中,必须添加以下中间件组件来启用 Dapr:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    // ...
    app.UseCloudEvents();

    app.UseEndpoints(endpoints =>
    {
        endpoints.MapSubscribeHandler();
        // ...
    });
}

The call to UseCloudEvents adds CloudEvents middleware into to the ASP.NET Core middleware pipeline. This middleware will unwrap requests that use the CloudEvents structured format, so the receiving method can read the event payload directly.

 对UseCloudEvents 的调用用于将 CloudEvents 中间件添加到 ASP.NET Core 中间件管道。此中间件将解包使用 CloudEvents 结构化格式的请求,因此接收方法可以直接读取事件载体。

CloudEvents is a standardized messaging format, providing a common way to describe event information across platforms. Dapr embraces CloudEvents. For more information about CloudEvents, see the cloudevents specification.

CloudEvents 是一种标准化的消息传递格式,提供跨平台描述事件信息的通用方式。Dapr 采用 CloudEvents。有关 CloudEvents 的详细信息,请参阅 CloudEvents 规范。

The call to MapSubscribeHandler in the endpoint routing configuration will add a Dapr subscribe endpoint to the application. This endpoint will respond to requests on /dapr/subscribe. When this endpoint is called, it will automatically find all WebAPI action methods decorated with the Topic attribute and instruct Dapr to create subscriptions for them.

终结点路由配置中对MapSubscribeHandler的调用会为应用程序添加Dapr 订阅终结点。此终结点将响应到/dapr/subscribe 上的请求 。调用此终结点时,它将自动查找所有用Topic 特性修饰的WebAPI 操作方法, 并指示 Dapr 为它们创建订阅。

Pub/sub components

Pub/sub 组件

Dapr pub/sub components handle the actual transport of the messages. Several are available. Each encapsulates a specific message broker product to implement the pub/sub functionality. At the time of writing, the following pub/sub components were available:

Dapr pub/sub 组件 处理消息的实际传输。有多个底层实现可用。每个都封装特定消息代理产品以实现发布/订阅功能。撰写本文时,可以使用以下发布/订阅组件:

  • Apache Kafka

  • Azure Event Hubs

  • Azure Service Bus

  • AWS SNS/SQS

  • GCP Pub/Sub

  • Hazelcast

  • MQTT

  • NATS

  • Pulsar

  • RabbitMQ

  • Redis Streams

Note

The Azure cloud stack has both messaging functionality (Azure Service Bus) and event streaming (Azure Event Hub) availability.

备注

Azure cloud stack 具有消息传递(Azure Service Bus)和事件流(Azure Event Hub) 功能。

These components are created by the community in a component-contrib repository on GitHub. You're encouraged to write your own Dapr component for a message broker that isn't yet supported.

这些组件由 GitHub 上的 component-contrib存储库中的社区创建。建议为尚不受支持的消息代理编写自己的 Dapr 组件。

Configure pub/sub components

配置发布/订阅组件

Using a Dapr configuration file, you can specify the pub/sub component(s) to use. This configuration contains several fields. The name field specifies the pub/sub component that you want to use. When sending or receiving a message, you need to specify this name (as you saw earlier in the PublishEventAsync method signature).

使用 Dapr 配置文件,您可以指定要使用的发布/订阅组件。此配置包含多个字段。 name字段指定要使用的发布/订阅组件。发送或接收消息时,需要指定此名称, (如之前在PublishEventAsync 方法签名) 中看到的那样 。

Below you see an example of a Dapr configuration file for configuring a RabbitMQ message broker component:

下面你将看到一个用于配置 RabbitMQ 消息代理组件的 Dapr 配置文件示例:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub-rq
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"
  - name: durable
    value: true

In this example, you can see that you can specify any message broker-specific configuration in the metadata block. In this case, RabbitMQ is configured to create durable queues. But the RabbitMQ component has more configuration options. Each of the components' configuration will have its own set of possible fields. You can read which fields are available in the documentation of each pub/sub component.

在此示例中,可以看到,你可以在metadata 块中指定任何特定于消息代理的配置 。此处,RabbitMQ 被配置为创建持久队列。但 RabbitMQ 组件具有更多的配置选项。每个组件的配置将有自己的一组字段。您可以阅读每个发布 /订阅组件的文档来了解其可用的字段。

Next to the programmatic way of subscribing to a topic from code, Dapr pub/sub also provides a declarative way of subscribing to a topic. This approach removes the Dapr dependency from the application code. Therefore, it also enables an existing application to subscribe to topics without any changes to the code. The following example shows a Dapr configuration file for configuring a subscription:

除了以编程方式订阅主题外,Dapr pub/sub 还提供了一种声明方式来订阅主题。此方法会从应用程序代码中删除 Dapr 依赖项。然而,它还允许现有应用程序无需更改代码,就能够订阅主题 。以下示例演示了用于配置订阅的 Dapr 配置文件:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: newOrder-subscription
spec:
  pubsubname: pubsub
  topic: newOrder
  route: /orders
scopes:
- ServiceB
- ServiceC

You have to specify several elements with every subscription:

  • The name of the Dapr pub/sub component you want to use (in this case pubsub).

  • The name of the topic to subscribe to (in this case newOrder).

  • The API operation that needs to be called for this topic (in this case /orders).

  • The scope can specify which services can publish and subscribe to a topic.

必须为每个订阅指定多个元素:

  • Dapr pub/sub 组件的名称(此处为pubsub) 。

  • 订阅的主题名称 (此处为 newOrder) 。

  • 需要为本主题调用 的API 操作 (此处为 /orders) 。

  • 作用域可以指定哪些服务可以发布和订阅主题。

Reference application: eShopOnDapr

参考应用程序:eShopOnDapr

The accompanying eShopOnDapr app provides an end-to-end reference architecture for constructing a microservices application implementing Dapr. eShopOnDapr is an evolution of the widely popular eShopOnContainers app, created several years ago. Both versions use the pub/sub pattern for communicating integration events across microservices. Integration events include:

  • When a user checks-out a shopping basket.

  • When a payment for an order has succeeded.

  • When the grace-period of a purchase has expired.

随附的 eShopOnDapr 应用提供端到端参考架构,用于构造实现 Dapr 的微服务应用程序。eShopOnDapr 是在几年前创建的广泛流行的 eShopOnContainers 应用程序的演变。这两个版本都使用发布/订阅模式来跨微服务传递 集成事件 。集成事件包括:

  • 当用户结账时。

  • 订单的付款成功时。

  • 购买的宽限期已过期时。

Eventing in eShopOnContainers is based on the following IEventBus interface:

EShopOnContainers 中的事件基于以下 IEventBus 接口:

public interface IEventBus
{
    void Publish(IntegrationEvent integrationEvent);

    void Subscribe<T, THandler>()
        where TEvent : IntegrationEvent
        where THandler : IIntegrationEventHandler<T>;
}

Concrete implementations of this interface exist in eShopOnContainers for both RabbitMQ and Azure Service Bus. Each implementation included a great deal of custom plumbing code that was complex to understand and difficult to maintain.

eShopOnContainers 中存在此接口的具体实现(RabbitMQ 和Azure Service Bus) 。每个实现都包含大量自定义的管道代码,这些代码非常复杂,难于理解和维护。

The newer eShopOnDapr significantly simplifies pub/sub behavior by using Dapr. For example, the IEventBus interface was reduced to a single method:

较新的 eShopOnDapr 通过使用 Dapr 大大简化了发布/订阅行为。例如, IEventBus 接口被缩减到单一方法:

public interface IEventBus
{
    Task PublishAsync(IntegrationEvent integrationEvent);
}

Publish events

发布事件

In the updated eShopOnDapr, a single DaprEventBus implementation can support any Dapr-supported message broker. The following code block shows the simplified Publish method. Note how the PublishAsync method uses the Dapr client to publish an event:

在更新了的 eShopOnDapr 中,一个 DaprEventBus 实现可以支持任何 Dapr 支持的消息代理。下面的代码块显示了简化的发布方法。请注意该 PublishAsync 方法如何使用 DaprClient来发布事件:

public class DaprEventBus : IEventBus
{
    private const string PubSubName = "pubsub";

    private readonly DaprClient _daprClient;
    private readonly ILogger<DaprEventBus> _logger;

    public DaprEventBus(DaprClient daprClient, ILogger<DaprEventBus> logger)
    {
        _daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    public async Task PublishAsync(IntegrationEvent integrationEvent)
    {
        var topicName = integrationEvent.GetType().Name;

        // Dapr uses System.Text.Json which does not support serialization of a
        // polymorphic type hierarchy by default. Using object as the type
        // parameter causes all properties to be serialized.
        await _daprClient.PublishEventAsync<object>(PubSubName, topicName, integrationEvent);
    }
}

As you can see in the code snippet, the topic name is derived from event type's name. Because all eShop services use the IEventBus abstraction, retrofitting Dapr required absolutely no change to the mainline application code.

如代码片段中所示,主题名称从事件类型的名称获得。因为所有 eShop services 都使用 IEventBus 抽象,替换底层组件完全不更改 主线应用程序代码。

Important

The Dapr SDK uses System.Text.Json to serialize/deserialize messages. However, System.Text.Json doesn't serialize properties of derived classes by default. In the eShop code, an event is sometimes explicitly declared as an IntegrationEvent, the base class for integration events. This is done because the concrete event type is determined dynamically at run time based on business logic. As a result, the event is serialized using the type information of the base class and not the derived class. To force System.Text.Json to serialize all properties of the derived class in this case, the code uses object as the generic type parameter. For more information, see the .NET documentation.

重要

Dapr SDK 使用 System.Text.Json 来序列化/反序列化消息。但是, System.Text.Json 默认情况下不会序列化派生类的属性。在 eShop 代码中,事件有时显式声明为 IntegrationEvent (集成事件的基类)。这是因为具体事件类型是在运行时基于业务逻辑动态确定的。因此,使用基类而不是派生类的类型信息对事件进行序列化。若要 System.Text.Json 在这种情况下强制序列化派生类的所有属性,代码需使用 object 作为泛型类型参数。有关详细信息,请参阅 .net 文档。

With Dapr, the infrastructure code is dramatically simplified. It doesn't need to distinguish between the different message brokers. Dapr provides this abstraction for you. And if needed, you can easily swap out message brokers or configure multiple message broker components.

借助 Dapr,可大大简化基础结构代码。不需要区分不同的消息代理。Dapr 为你提供此抽象。如果需要,可以轻松地替换消息代理或配置多个消息代理组件。

Subscribe to events

订阅事件

The earlier eShopOnContainers app contains SubscriptionManagers to handle the subscription implementation for each message broker. Each manager contains complex message broker-specific code for handling subscription events. To receive events, each service has to explicitly register a handler for each event-type.

早前的 eShopOnContainers 应用程序包含 订阅管理器 来处理每个消息代理的订阅实现。每个管理器都包含用于处理订阅事件的消息代理特定的复杂代码。若要接收事件,每个服务必须为每个事件类型显式注册一个处理程序。

eShopOnDapr streamlines the plumbing for event subscriptions by using Dapr ASP.NET Core libraries. Each event is handled by an action method in the controller. A Topic attribute decorates the action method with the name of the corresponding topic to subscribe to. Here's a code snippet taken from the PaymentService:

eShopOnDapr 使用 Dapr ASP.NET Core 库优化了事件订阅的管道。每个事件都由控制器中的操作方法处理。 Topic特性使用主题的名称修饰操作方法。下面是从PaymentService 中获取的代码片段 :

[Route("api/v1/[controller]")]
[ApiController]
public class IntegrationEventController : ControllerBase
{
    private const string DAPR_PUBSUB_NAME = "pubsub";

    private readonly IServiceProvider _serviceProvider;

    public IntegrationEventController(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }

    [HttpPost("OrderStatusChangedToValidated")]
    [Topic(DAPR_PUBSUB_NAME, "OrderStatusChangedToValidatedIntegrationEvent")]
    public async Task OrderStarted(OrderStatusChangedToValidatedIntegrationEvent integrationEvent)
    {
        var handler = _serviceProvider.GetRequiredService<OrderStatusChangedToValidatedIntegrationEventHandler>();
        await handler.Handle(integrationEvent);
    }
}

In the Topic attribute, the name of the .NET type of the event is used as the topic name. For handling the event, an event handler that already existed in the earlier eShopOnContainers code base is invoked. In the previous example, messages received from the OrderStatusChangedToValidatedIntegrationEvent topic invoke the existing OrderStatusChangedToValidatedIntegrationEventHandler event-handler. Because Dapr implements the underlying plumbing for subscriptions and message brokers, a large amount of original code became obsolete and was removed from the code-base. Much of this code was complex to understand and challenging to maintain.

在 Topic 特性中,事件的 .net 类型的名称将用作主题名称。在处理事件时,将调用以前的 eShopOnContainers 代码中已有的事件处理程序。在上面的示例中,从OrderStatusChangedToValidatedIntegrationEvent 主题接收消息后调用了现有的 OrderStatusChangedToValidatedIntegrationEventHandler 事件处理程序。由于 Dapr 实现了订阅和消息代理的底层管道,因此大量原始代码已过时,并已从代码中删除。这些代码对于理解和维护非常复杂。

Use pub/sub components

使用发布/订阅组件

 Within the eShopOnDapr repository, a deployment folder contains files for deploying the application using different deployment modes: Docker Compose and Kubernetes. A dapr folder exists within each of these folders that holds a components folder. This folder holds a file eshop-pubsub.yaml containing the configuration of the Dapr pub/sub component that the application will use for pub/sub behavior. As you saw in the earlier code snippets, the name of the pub/sub component used is pubsub. Here's the content of the eshop-pubsub.yaml file in the deployment/compose/dapr/components folder:

在 eShopOnDapr 存储库中, deployment 文件夹包含使用不同部署模式(Docker Compose 和 Kubernetes) 部署应用程序的文件。compose文件夹或kubernetes文件夹中的dapr文件夹中都有一个components文件夹(此句不会翻译)。此文件夹包含一个eshop-pubsub.yaml文件,该文件包含应用程序将使用的 Dapr pub/sub 组件的配置。正如您在前面的代码片段中所看到的那样,所使用的 pub/sub 组件的名称为 pubsub 。下面是deployment/compose/dapr/components 文件夹中 eshop-pubsub.yaml 文件的内容 :

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.nats
  version: v1
  metadata:
  - name: natsURL
    value: nats://demo.nats.io:4222

The preceding configuration specifies the desired NATS message broker for this example. To change message brokers, you need only to configure a different message broker, such as RabbitMQ or Azure Service Bus and update the yaml file. With Dapr, there are no changes to your mainline service code when switching message brokers.

前面的配置指定此示例使用的 NATS 消息代理 。若要更改消息代理,只需更新 yaml 文件来配置一个不同的消息代理,如 RabbitMQ 或 Azure Service Bus。使用Dapr,切换消息代理时,无需对主线服务代码进行任何更改。

Finally, you might ask, "Why would I need multiple message brokers in an application?". Many times a system will handle workloads with different characteristics. One event may occur 10 times a day, but another event occurs 5,000 times per second. You may benefit by partitioning messaging traffic to different message brokers. With Dapr, you can add multiple pub/sub component configurations, each with a different name.

最后,您可能会问:"为什么需要在一个应用程序中使用多个消息代理?"。很多时候系统将处理具有不同特征的工作负荷。一个事件可能一天发生10次,但另一个事件每秒发生5000次。可以通过将消息传送流量分区给不同消息代理来受益。使用 Dapr,可以添加多个 pub/sub 组件配置,每个配置使用不同的名称。

Summary

总结

The pub/sub pattern helps you decouple services in a distributed application. The Dapr publish & subscribe building block simplifies implementing this behavior in your application.

Pub/sub 模式可帮助你解耦分布式应用程序中的服务。在应用程序中使用Dapr 发布 & 订阅构建块可简化此工作。

Through Dapr pub/sub, you can publish messages to a specific topic. As well, the building block will query your service to determine which topic(s) to subscribe to.

通过 Dapr pub/sub,你可以将消息发布到特定 主题。同时,构建块将查询你的服务,以确定订阅的主题。

You can use Dapr pub/sub natively over HTTP or by using one of the language-specific SDKs, such as the .NET SDK for Dapr. The .NET SDK tightly integrates with the ASP.NET core platform.

你可以通过 HTTP 或使用特定于语言的 Sdk(如 .NET SDK for Dapr)使用 Dapr pub/sub。.NET SDK 与 ASP.NET core 平台紧密集成。

With Dapr, you can plug a supported message broker product into your application. You can then swap message brokers without requiring code changes to your application.

使用 Dapr,可以将受支持的消息代理插入应用程序。然后,你可以在无需对应用程序进行代码更改的情况下替换消息代理。

目录:

以上是关于面向.NET开发人员的Dapr——发布和订阅的主要内容,如果未能解决你的问题,请参考以下文章

面向.NET开发人员的Dapr——机密

面向.NET开发人员的Dapr——绑定

面向.NET开发人员的Dapr——可观察性

面向.NET开发人员的Dapr——服务调用

面向.NET开发人员的Dapr——状态管理

面向.NET开发人员的Dapr——参考应用程序