使用 C# 反序列化 Avro 文件

Posted

技术标签:

【中文标题】使用 C# 反序列化 Avro 文件【英文标题】:Deserialize an Avro file with C# 【发布时间】:2017-02-12 07:07:36 【问题描述】:

我找不到使用 C# 反序列化 Apache Avro 文件的方法。 Avro 文件是由 Microsoft Azure 事件中心中的Archive feature 生成的文件。

使用 Java,我可以使用 Apache 的 Avro Tools 将文件转换为 JSON:

java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json

使用 NuGet 包 Microsoft.Hadoop.Avro 我能够提取 SequenceNumberOffsetEnqueuedTimeUtc,但由于我不知道 Body 使用什么类型抛出异常。我试过Dictionary<string, object>和其他类型。

static void Main(string[] args)

    var fileName = "...";

    using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
    
        using (var reader = AvroContainer.CreateReader<EventData>(stream))
        
            using (var streamReader = new SequentialReader<EventData>(reader))
            
                var record = streamReader.Objects.FirstOrDefault();
            
        
    


[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData

    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber  get; set; 

    [DataMember(Name = "Offset")]
    public string Offset  get; set; 

    [DataMember(Name = "EnqueuedTimeUtc")]
    public string EnqueuedTimeUtc  get; set; 

    [DataMember(Name = "Body")]
    public foo Body  get; set; 

    // More properties...

架构如下所示:


  "type": "record",
  "name": "EventData",
  "namespace": "Microsoft.ServiceBus.Messaging",
  "fields": [
    
      "name": "SequenceNumber",
      "type": "long"
    ,
    
      "name": "Offset",
      "type": "string"
    ,
    
      "name": "EnqueuedTimeUtc",
      "type": "string"
    ,
    
      "name": "SystemProperties",
      "type": 
        "type": "map",
        "values": [ "long", "double", "string", "bytes" ]
      
    ,
    
      "name": "Properties",
      "type": 
        "type": "map",
        "values": [ "long", "double", "string", "bytes" ]
      
    ,
    
      "name": "Body",
      "type": [ "null", "bytes" ]
    
  ]
    

【问题讨论】:

这可能会有所帮助:http://***.com/questions/43993644/reading-event-hub-archive-file-in-c-sharp 快速编码怎么样?有人有问题吗? 【参考方案1】:

我建议你使用https://github.com/AdrianStrugala/AvroConvert

简单地说:

byte[] avroFileContent = File.ReadAllBytes(fileName);
var result = AvroConvert.Deserialize<EventData>(avroFileContent);

该库本身旨在通过使用 Avro 格式改进开发流程。您甚至不需要模型上的架构或属性。 (我是这个库的贡献者)

【讨论】:

【参考方案2】:

您还可以使用NullableSchema 属性将Body 标记为字节和null 的并集。这将允许您使用强类型接口。

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData

    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber  get; set; 

    [DataMember(Name = "Offset")]
    public string Offset  get; set; 

    [DataMember(Name = "EnqueuedTimeUtc")]
    public string EnqueuedTimeUtc  get; set; 

    [DataMember(Name = "Body")]
    [NullableSchema]
    public foo Body  get; set; 

【讨论】:

【参考方案3】:

此Gist 展示了如何使用 Microsoft.Hadoop.Avro2 使用 C# 反序列化事件中心捕获,其优点是同时兼容 .NET Framework 4.5 和 .NET Standard 1.6:

 var connectionString = "<Azure event hub capture storage account connection string>";
 var containerName = "<Azure event hub capture container name>";
 var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";

 var storageAccount = CloudStorageAccount.Parse(connectionString);
 var blobClient = storageAccount.CreateCloudBlobClient();
 var container = blobClient.GetContainerReference(containerName);
 var blob = container.GetBlockBlobReference(blobName);
 using (var stream = blob.OpenRead())
 using (var reader = AvroContainer.CreateGenericReader(stream))
     while (reader.MoveNext())
         foreach (dynamic result in reader.Current.Objects)
         
             var record = new AvroEventData(result);
             record.Dump();
         

 public struct AvroEventData
 
     public AvroEventData(dynamic record)
     
         SequenceNumber = (long) record.SequenceNumber;
         Offset = (string) record.Offset;
         DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
         EnqueuedTimeUtc = enqueuedTimeUtc;
         SystemProperties = (Dictionary<string, object>) record.SystemProperties;
         Properties = (Dictionary<string, object>) record.Properties;
         Body = (byte[]) record.Body;
     
     public long SequenceNumber  get; set; 
     public string Offset  get; set; 
     public DateTime EnqueuedTimeUtc  get; set; 
     public Dictionary<string, object> SystemProperties  get; set; 
     public Dictionary<string, object> Properties  get; set; 
     public byte[] Body  get; set; 
 

NuGet 参考:

Microsoft.Hadoop.Avro2(1.2.1 有效) WindowsAzure.Storage(8.3.0 有效)

命名空间:

Microsoft.Hadoop.Avro.Container Microsoft.WindowsAzure.Storage

【讨论】:

请avoid "link-only answers"。考虑到该链接将来可能会断开,如果没有它,答案也应该有用。 非常好我想做类似的事情这里是我的详细问题有帮助吗? ***.com/questions/48462311/…【参考方案4】:

我终于能够让它与 Apache C# 库/框架一起使用。 我被卡住了一段时间,因为 Azure 事件中心的捕获功能有时会输出一个没有任何消息内容的文件。 我可能还对最初如何将消息序列化为 EventData 对象有疑问。 下面的代码用于从捕获 blob 容器保存到磁盘的文件。

var dataFileReader = DataFileReader<EventData>.OpenReader(file);
foreach (var record in dataFileReader.NextEntries)

   // Do work on EventData object

这也适用于使用 GenericRecord 对象。

var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);

这需要一些努力才能弄清楚。但是,我现在同意此 Azure 事件中心捕获功能是备份所有事件的绝佳功能。我仍然觉得他们应该像使用 Stream Analytic 作业输出一样将格式设为可选,但也许我会习惯 Avro。

【讨论】:

你是如何使用这段代码的? DataFileReader在哪个nuget中? @neo 可以在Confluent.Apache.Avronuget 包中找到与上述代码一起使用的 DataFileReader 的一个版本。【参考方案5】:

我能够使用dynamic 获得完整的数据访问权限。这是访问原始body 数据的代码,该数据存储为字节数组。就我而言,这些字节包含 UTF8 编码的 JSON,但当然这取决于您最初创建发布到事件中心的 EventData 实例的方式:

using (var reader = AvroContainer.CreateGenericReader(stream))

    while (reader.MoveNext())
    
        foreach (dynamic record in reader.Current.Objects)
        
            var sequenceNumber = record.SequenceNumber;
            var bodyText = Encoding.UTF8.GetString(record.Body);
            Console.WriteLine($"sequenceNumber: bodyText");
        
    

如果有人可以发布静态类型的解决方案,我会投赞成票,但鉴于任何系统中较大的延迟几乎肯定是与事件中心存档 blob 的连接,我不会担心解析性能。 :)

【讨论】:

非常好我想做类似的事情这是我的详细问题有帮助吗? ***.com/questions/48462311/… 作为脚注,当事件中心捕获为您提供“空”blob(在该捕获间隔期间在给定分区中未收到任何事件)时 - 在我的情况下,此类 blob 的大小目前为 508 B - - 然后反序列化失败,神秘地抱怨“大小”参数 (System.ArgumentOutOfRangeException: 'Specified argument was out of the range of valid values. Parameter name: size')。只要捕获 blob 中至少存在一个事件,相同的逻辑就会再次起作用。 我在流中得到无效的 Avro 对象容器。标头无法识别。【参考方案6】:

你剩下的类型,我怀疑应该定义为:

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
[KnownType(typeof(Dictionary<string, object>))]
public class EventData

    [DataMember]
    public IDictionary<string, object> SystemProperties  get; set; 

    [DataMember]
    public IDictionary<string, object> Properties  get; set; 

    [DataMember]
    public byte[] Body  get; set; 

尽管Bodynullbytes 的联合,但它映射到nullable byte[]

在 C# 中,数组始终是引用类型,因此可以是 null 并履行合同。

【讨论】:

谢谢,不过没用:Could not find any matching known type for 'System.Collections.Generic.IDictionary`2[System.String,System.Object]'. @KristofferJälén 这个例外是专门针对Body 属性的吗? 不,该例外是针对 SystemProperties 属性的。 尝试使用[KnownType(typeof(Dictionary&lt;string, object&gt;))] 之类的注释类?否则你也可以尝试使用具体类型而不是接口。

以上是关于使用 C# 反序列化 Avro 文件的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息

Avro序列化与反序列化

Apache Avro:简单的序列化和反序列化操作的Demo

使用 kafka lib 反序列化 PRIMITIVE AVRO KEY

反序列化 Avro Spark

使用avro序列化和反序列化