MassTransit 与 Kafka 和 NodaTime

Posted

技术标签:

【中文标题】MassTransit 与 Kafka 和 NodaTime【英文标题】:MassTransit with Kafka and NodaTime 【发布时间】:2022-01-22 02:01:28 【问题描述】:

我在 C# 代码中有以下消息:

public interface ResourcePerformance

    public string ResourceId  get; 
    public List<TimeSection> TimeSections  get; 

public class TimeSection

    public Instant PeriodStart  get; set; 
    public Instant PeriodEnd  get; set; 
    public PerformanceStatus PerformanceStatus  get; set; 
    public Duration ProcessingTime  get; set; 
    public double Quantity  get; set; 

我想反序列化来自 Kafka 主题的此类消息。但是从 NodaTime 库中反序列化类型时会出现错误,例如:

Confluent.Kafka.ConsumeException: Local: Value deserialization error
 ---> Newtonsoft.Json.JsonSerializationException: Error converting value "2:00:00" to type 'NodaTime.Duration'. Path 'timeSections[0].processingTime', line 1, position 330.

我想这一定是 NodaTime 序列化的原因,因为当我将 NodaTime 类型更改为对象时,没有报告错误。我已经在配置的 RabbitMQ 部分为 json 序列化程序配置了 NodaTime,但我不知道如何在 Kafka 部分执行此操作。目前我有以下配置:

services.AddMassTransit(x =>
        
            x.SetKebabCaseEndpointNameFormatter();

            x.UsingRabbitMq((context, cfg) =>
            

                cfg.ConfigureJsonSerializer( j => j.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb) );
                cfg.ConfigureEndpoints(context);
            );
            
            x.AddRider(rider =>
                                                 
                rider.AddConsumer<ResourcePerformanceConsumer>();

                rider.UsingKafka((context, k) =>
                
                                            
                    k.TopicEndpoint<string, ResourcePerformance>("performances-resource", kafkaConsumerGroup , e =>
                    
                        e.AutoOffsetReset = AutoOffsetReset.Earliest;
                        e.CreateIfMissing(t =>
                        
                            t.NumPartitions = 4; //number of partitions
                            t.ReplicationFactor = 1; //number of replicas
                        );
                        e.ConfigureConsumer<ResourcePerformanceConsumer>(context);
                    );
                );
            );
        );

        services.AddMassTransitHostedService();

如何正确反序列化它?

【问题讨论】:

过去几天有一个关于使用 NodaTime 和 MassTransit 的不和谐线程。我建议在那里检查。 谢谢!!! - 我在那里找到了解决方案:) 【参考方案1】:

我在 MassTransit discord 上找到了答案。除了配置序列化器,还需要配置反序列化器:

cfg.ConfigureJsonDeserializer(j => j.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb) );

【讨论】:

以上是关于MassTransit 与 Kafka 和 NodaTime的主要内容,如果未能解决你的问题,请参考以下文章

使用 Azure 服务总线 V7 的 MassTransit:缺少属性 TokenProvider

使用交换和使用 MassTransit 的路由密钥发布消息

我们可以通过 masstransit 一起使用 RabbitMQ 和 Mediatr 吗?

RabbitMQ 和 MassTransit 之间的连接

未在代码中指定的交换上的MassTransit ACCESS_REFUSED

MassTransit中Request&Response基本使用