GenericRecord 的 Avro 架构:能够保留空白字段

Posted

技术标签:

【中文标题】GenericRecord 的 Avro 架构:能够保留空白字段【英文标题】:Avro Schema for GenericRecord: Be able to leave blank fields 【发布时间】:2020-03-08 19:39:29 【问题描述】:

我正在使用 Java 将 JSON 转换为 Avro,并使用 Google DataFlow 将这些存储到 GCS。 Avro 模式是在运行时使用 SchemaBuilder 创建的。

我在架构中定义的字段之一是可选的 LONG 字段,它的定义如下:

SchemaBuilder.FieldAssembler<Schema> fields = SchemaBuilder.record(mainName).fields();
Schema concreteType = SchemaBuilder.nullable().longType();
fields.name("key1").type(concreteType).noDefault();

现在,当我使用上面的架构创建 GenericRecord 并且未设置“key1”时,将生成的 GenericRecord 放入我的 DoFn 的上下文中时:context.output(res); 我收到以下错误:

线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.avro.UnresolvedUnionException: Not in union ["long","null"]: 256

我也尝试用withDefault(0L) 做同样的事情,得到了同样的结果。

我错过了什么? 谢谢

【问题讨论】:

能否用完整的管道代码更新您的帖子? 【参考方案1】:

当我尝试如下时它工作正常,您可以尝试打印有助于比较的架构,您也可以删除长类型的 nullable() 来尝试。

fields.name("key1").type().nullable().longType().longDefault(0);

提供了我用来测试的完整代码:

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.RecordBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

import java.io.File;
import java.io.IOException;

public class GenericRecordExample 

  public static void main(String[] args) 

    FieldAssembler<Schema> fields;
    RecordBuilder<Schema> record = SchemaBuilder.record("Customer");
    fields = record.namespace("com.example").fields();
    fields = fields.name("first_name").type().nullable().stringType().noDefault();
    fields = fields.name("last_name").type().nullable().stringType().noDefault();
    fields = fields.name("account_number").type().nullable().longType().longDefault(0);

    Schema schema = fields.endRecord();
    System.out.println(schema.toString());

    // we build our first customer
    GenericRecordBuilder customerBuilder = new GenericRecordBuilder(schema);
    customerBuilder.set("first_name", "John");
    customerBuilder.set("last_name", "Doe");
    customerBuilder.set("account_number", 999333444111L);
    Record myCustomer = customerBuilder.build();
    System.out.println(myCustomer);

    // writing to a file
    final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) 
      dataFileWriter.create(myCustomer.getSchema(), new File("customer-generic.avro"));
      dataFileWriter.append(myCustomer);
      System.out.println("Written customer-generic.avro");
     catch (IOException e) 
      System.out.println("Couldn't write file");
      e.printStackTrace();
    

    // reading from a file
    final File file = new File("customer-generic.avro");
    final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
    GenericRecord customerRead;
    try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader))
      customerRead = dataFileReader.next();
      System.out.println("Successfully read avro file");
      System.out.println(customerRead.toString());

      // get the data from the generic record
      System.out.println("First name: " + customerRead.get("first_name"));

      // read a non existent field
      System.out.println("Non existent field: " + customerRead.get("not_here"));
    
    catch(IOException e) 
      e.printStackTrace();
    
  

【讨论】:

【参考方案2】:

如果我正确理解您的问题,您正在尝试接受 JSON 字符串并将它们保存在 Cloud Storage 存储桶中,使用 Avro 作为数据在数据流中移动时的编码器。从您的代码中看不出任何对我来说看起来不对的地方。我已经完成了这项工作,包括将数据保存到 Cloud Storage 和 BigQuery。

您可能会考虑使用一种更简单且可能不太容易出错的方法:为您的数据定义一个 Java 类并在其上使用 Avro 注释以使编码器能够正常工作。这是一个例子:

import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Data 
    public long nonNullableValue;
    @Nullable public long nullableValue;

然后,在您的 DnFn 实现中使用这种类型,就像您可能已经使用的那样。 Beam 应该能够使用 Avro 在工作人员之间正确移动数据,即使标记为 @Nullable 的字段为空。

【讨论】:

以上是关于GenericRecord 的 Avro 架构:能够保留空白字段的主要内容,如果未能解决你的问题,请参考以下文章

如何将 RDD [GenericRecord] 转换为 scala 中的数据框?

Beam - 读取 AVRO 并转换

如果字段顺序更改,Avro 模式不兼容

如何读取大的avro文件,并将整个文件加载到内存中。

Schema注册和解决

推断 BigQuery 表加载的 avro 架构