Dapr PubSub 与 dotnet SDK

Posted

技术标签:

【中文标题】Dapr PubSub 与 dotnet SDK【英文标题】:Dapr PubSub with dotnet SDK 【发布时间】:2021-12-10 19:09:46 【问题描述】:

我正在尝试使用 dotnet 运行基本的 Dapr 设置。我关注了文档和示例项目,但现在没有运气。

我用 net5.0 创建了一个简单的 dotnet Web API 应用程序。 API 有一个控制器和三对 get/post 端点。每对都针对特定的 pub-sub 提供者(nats、rabbit、Redis)。

using System.Runtime.Serialization;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Request.Body.Peeker;
namespace live

    [ApiController]
    [Route("/")]
    public class HomeController : ControllerBase
    
        private readonly ILogger<HomeController> logger;
        private readonly DaprClient dapr;
        public HomeController(ILogger<HomeController> logger, DaprClient dapr)
        
            this.dapr = dapr;
            this.logger = logger;

        
        [HttpGet]
        public async Task<ActionResult> Produce()
        
            var message = new Message()  Payload = "Nats Jetstream poruka" ;
            await this.dapr.PublishEventAsync<Message>("nats-pubsub", "orders.new", message);

            return Ok("Sent!");
        


        [HttpPost("nats/subscribe")]        
        [Topic("nats-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeAsync(Message message)
                  
            
            this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        


        [HttpGet("rabbit")]
        public async Task<ActionResult> ProduceRabbit()
        
            var message = new Message()  Payload = "Rabbit MQ poruka" ;
            await this.dapr.PublishEventAsync<Message>("rabbit-pubsub", "orders.new", message);

            return Ok("Sent!");
        


        //[HttpPost("rabbit/subscribe")]
        [Route("rabbit/subscribe")]
        [HttpPost()]
        [Topic("rabbit-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeRabbitAsync(Message message)
                    
            this.logger.LogInformation("ssage received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        


        [HttpGet("redis")]
        public async Task<ActionResult> ProduceRedis()
        
            var message = new Message()  Payload = "Redis poruka" ;
            await this.dapr.PublishEventAsync<Message>("redis-pubsub", "orders.new", message);

            return Ok("Sent!");
        


        [HttpPost("redis/subscribe")]
        [Topic("redis-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeRedisAsync(Message message)
        
            this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        
    

    
    public class Message
    
        public string Payload  get; set; 
    

应用程序的 Startup.cs 看起来像

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Live

    public class Startup
            
        public void ConfigureServices(IServiceCollection services)
        
            services.AddControllers()
                .AddNewtonsoftJson()
                .AddDapr();
            services.AddHttpClient();
            services.AddDaprClient(); //Really no need for this
        
                
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        
            if (env.IsDevelopment())
            
                app.UseDeveloperExceptionPage();
            

            app.UseRouting();
            //app.UseCloudEvents();

            app.UseEndpoints(endpoints =>
            
                endpoints.MapSubscribeHandler();
                endpoints.MapControllers();
            );
        
    

Dapr 配置

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6380
  - name: redisPassword
    value: ""

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: nats-pubsub
  namespace: default
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: name
    value: "alan"
  - name: durableName
    value: "conversation-durable"
  - name: queueGroupName
    value: "conversation-group"
  # - name: startSequence
  #   value: 1
  # - name: startTime # in Unix format
  #   value: 1630349391
  # - name: deliverAll
  #   value: false
  # - name: flowControl
  #   value: false

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbit-pubsub
  namespace: default
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"

我的 docker-compose 文件

version: '3.4'

services: 
  nats:
    container_name: "Nats"
    image: nats
    command: [ "-js", "-m", "8222", "-D", "-V" ]
    ports:
      - "4222:4222"
      - "8222:8222"
      - "6222:6222"

  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"

  postgres:
    container_name: "PostgreSQL"
    image: postgres
    environment:
      - POSTGRES_PASSWORD=rotring123
      - PGDATA=/var/lib/postgresql/data/pgdata
    # volumes:
    #   - .\\docker-volumes\\postgreSQL:/var/lib/postgresql/data
    ports:
      - "8081:8080"
      - "5432:5432"

  redis:
    container_name: Redis
    image: redis
    ports:
      - "6380:6379"
    # volumes:
    #   - .\\docker-volumes\\redis:/usr/local/etc/redis

  # dapr-placement:
  #   container_name: Dapr-service-descovery
  #   image: "daprio/dapr:1.0.0"
  #   command: ["./placement", "-port", "50000", "-log-level", "debug"]
  #   ports:
  #     - "50000:50000"

  # zipkin:
  #   image: openzipkin/zipkin-slim
  #   ports:
  #     - "5411:9411"

我正在使用命令 dapr run -a live -p 5226 dotnet run 从 CLI 启动应用程序

应用程序已启动,当我去获取端点消息时已发送。我可以确认消息已发送到消息代理并且有效负载正常。此外,Dapr 调用我的 post 端点(每个 rabbit、nats 和 redis),但在方法参数中,我收到了 Payload 类的 Payload 属性的 null 值。

我遵循了 TrafficControll 示例,在我看来一切都设置正确。

Dapr 运行时版本:1.4.3 以下是日志截图:https://prnt.sc/1xa8s14

非常感谢任何帮助!

【问题讨论】:

【参考方案1】:

[FromBody] 属性添加到操作方法参数中。

例如:

public async Task<ActionResult> SubscribeAsync([FromBody] Message message)

【讨论】:

您好,很抱歉回复晚了。我设法让这个工作,问题出在app.UseCloudEvents()。属性ApiController 也是必需的。如果我设置[FromBody] 属性,它就不起作用。据我了解,Dapr 不会以这种方式发送有效负载。为了确定,我还在运行之前从 nats 中删除了所有消费者。我也在 dapr discord 频道上问过这个问题,所以你可以看到我的问题和讨论there。 [FromBody] 应该可以工作。 link 在“Dapr 发布和订阅构建块”下显示了一个示例。你有对Dapr.AspNetCore NuGet 包的引用吗? 嗯,我在文档中找不到它:)。 prnt.sc/1z4o08g Pub/Sub 部分 - 使用 Dapr .NET SDK - 第 70 页。在 PDF 中搜索这句话:“要发布消息,DaprClient 会公开 PublishEventAsync 方法” 它对我也不起作用,可以将数据发布到服务总线但不会触发控制器订阅方法

以上是关于Dapr PubSub 与 dotnet SDK的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Google.Pubsub.V1 beta01 不适用于 dotnet cli 项目?

Dapr介绍

面向.NET开发人员的Dapr——目录

“处理组件 pubsub 错误:组件 pubsub 的初始化超时超过 5 秒”

面向.NET开发人员的Dapr- actors 构建块

使用 GCloud 模拟器的 Google Cloud PubSub V1