Dapr PubSub 与 dotnet SDK
Posted
技术标签:
【中文标题】Dapr PubSub 与 dotnet SDK【英文标题】:Dapr PubSub with dotnet SDK 【发布时间】:2021-12-10 19:09:46 【问题描述】:我正在尝试使用 dotnet 运行基本的 Dapr 设置。我关注了文档和示例项目,但现在没有运气。
我用 net5.0 创建了一个简单的 dotnet Web API 应用程序。 API 有一个控制器和三对 get/post 端点。每对都针对特定的 pub-sub 提供者(nats、rabbit、Redis)。
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Request.Body.Peeker;
namespace live
[ApiController]
[Route("/")]
public class HomeController : ControllerBase
private readonly ILogger<HomeController> logger;
private readonly DaprClient dapr;
public HomeController(ILogger<HomeController> logger, DaprClient dapr)
this.dapr = dapr;
this.logger = logger;
[HttpGet]
public async Task<ActionResult> Produce()
var message = new Message() Payload = "Nats Jetstream poruka" ;
await this.dapr.PublishEventAsync<Message>("nats-pubsub", "orders.new", message);
return Ok("Sent!");
[HttpPost("nats/subscribe")]
[Topic("nats-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeAsync(Message message)
this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
[HttpGet("rabbit")]
public async Task<ActionResult> ProduceRabbit()
var message = new Message() Payload = "Rabbit MQ poruka" ;
await this.dapr.PublishEventAsync<Message>("rabbit-pubsub", "orders.new", message);
return Ok("Sent!");
//[HttpPost("rabbit/subscribe")]
[Route("rabbit/subscribe")]
[HttpPost()]
[Topic("rabbit-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeRabbitAsync(Message message)
this.logger.LogInformation("ssage received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
[HttpGet("redis")]
public async Task<ActionResult> ProduceRedis()
var message = new Message() Payload = "Redis poruka" ;
await this.dapr.PublishEventAsync<Message>("redis-pubsub", "orders.new", message);
return Ok("Sent!");
[HttpPost("redis/subscribe")]
[Topic("redis-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeRedisAsync(Message message)
this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
public class Message
public string Payload get; set;
应用程序的 Startup.cs 看起来像
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace Live
public class Startup
public void ConfigureServices(IServiceCollection services)
services.AddControllers()
.AddNewtonsoftJson()
.AddDapr();
services.AddHttpClient();
services.AddDaprClient(); //Really no need for this
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
if (env.IsDevelopment())
app.UseDeveloperExceptionPage();
app.UseRouting();
//app.UseCloudEvents();
app.UseEndpoints(endpoints =>
endpoints.MapSubscribeHandler();
endpoints.MapControllers();
);
Dapr 配置
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6380
- name: redisPassword
value: ""
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: nats-pubsub
namespace: default
spec:
type: pubsub.jetstream
version: v1
metadata:
- name: natsURL
value: "nats://localhost:4222"
- name: name
value: "alan"
- name: durableName
value: "conversation-durable"
- name: queueGroupName
value: "conversation-group"
# - name: startSequence
# value: 1
# - name: startTime # in Unix format
# value: 1630349391
# - name: deliverAll
# value: false
# - name: flowControl
# value: false
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: rabbit-pubsub
namespace: default
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://localhost:5672"
我的 docker-compose 文件
version: '3.4'
services:
nats:
container_name: "Nats"
image: nats
command: [ "-js", "-m", "8222", "-D", "-V" ]
ports:
- "4222:4222"
- "8222:8222"
- "6222:6222"
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
postgres:
container_name: "PostgreSQL"
image: postgres
environment:
- POSTGRES_PASSWORD=rotring123
- PGDATA=/var/lib/postgresql/data/pgdata
# volumes:
# - .\\docker-volumes\\postgreSQL:/var/lib/postgresql/data
ports:
- "8081:8080"
- "5432:5432"
redis:
container_name: Redis
image: redis
ports:
- "6380:6379"
# volumes:
# - .\\docker-volumes\\redis:/usr/local/etc/redis
# dapr-placement:
# container_name: Dapr-service-descovery
# image: "daprio/dapr:1.0.0"
# command: ["./placement", "-port", "50000", "-log-level", "debug"]
# ports:
# - "50000:50000"
# zipkin:
# image: openzipkin/zipkin-slim
# ports:
# - "5411:9411"
我正在使用命令 dapr run -a live -p 5226 dotnet run
从 CLI 启动应用程序
应用程序已启动,当我去获取端点消息时已发送。我可以确认消息已发送到消息代理并且有效负载正常。此外,Dapr 调用我的 post 端点(每个 rabbit、nats 和 redis),但在方法参数中,我收到了 Payload
类的 Payload
属性的 null
值。
我遵循了 TrafficControll 示例,在我看来一切都设置正确。
Dapr 运行时版本:1.4.3
以下是日志截图:https://prnt.sc/1xa8s14
非常感谢任何帮助!
【问题讨论】:
【参考方案1】:将[FromBody]
属性添加到操作方法参数中。
例如:
public async Task<ActionResult> SubscribeAsync([FromBody] Message message)
【讨论】:
您好,很抱歉回复晚了。我设法让这个工作,问题出在app.UseCloudEvents()
。属性ApiController
也是必需的。如果我设置[FromBody]
属性,它就不起作用。据我了解,Dapr 不会以这种方式发送有效负载。为了确定,我还在运行之前从 nats 中删除了所有消费者。我也在 dapr discord 频道上问过这个问题,所以你可以看到我的问题和讨论there。
[FromBody]
应该可以工作。 link 在“Dapr 发布和订阅构建块”下显示了一个示例。你有对Dapr.AspNetCore
NuGet 包的引用吗?
嗯,我在文档中找不到它:)。 prnt.sc/1z4o08g
Pub/Sub 部分 - 使用 Dapr .NET SDK - 第 70 页。在 PDF 中搜索这句话:“要发布消息,DaprClient 会公开 PublishEventAsync 方法”
它对我也不起作用,可以将数据发布到服务总线但不会触发控制器订阅方法以上是关于Dapr PubSub 与 dotnet SDK的主要内容,如果未能解决你的问题,请参考以下文章
为啥 Google.Pubsub.V1 beta01 不适用于 dotnet cli 项目?