Avro Schema Evolution with GenericData.Record - Mapreduce 过程

Posted

技术标签:

【中文标题】Avro Schema Evolution with GenericData.Record - Mapreduce 过程【英文标题】:Avro Schema Evolution With GenericData.Record - Mapreduce process 【发布时间】:2016-05-06 21:51:25 【问题描述】:

我有一个 mapreduce 程序,它从 avro 数据中读取数据,对其进行处理并输出 avro 数据。我有这个 avro 数据的模式,假设有 4 列。 我使用 GenericData.Record 来编写 avro 数据。

现在我在此数据之上创建一个猪关系,其架构具有 5 列。第 5 列是新的,具有在 avsc 文件中定义的默认值。 根据我的理解,我应该能够使用带有一个附加列的新模式来读取旧数据(由 4 列生成)。 相反,我收到一条错误消息 - 尝试访问不存在的列。

我错过了什么?

Mapreduce 驱动程序代码

Job job = Job.getInstance(getConf());  
job.setJarByClass(DeltaCaptureMRJobDriverWithSameSchema.class);
job.setJobName("CDC");
job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);

//This is required to use avro-1.7.6 and above
job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapperClass(DeltaCaptureMapperMultiPaths.class);
Schema schema = new Schema.Parser().parse(new File(args[2]));
AvroJob.setInputKeySchema(job, schema);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AvroValue.class);
AvroJob.setMapOutputValueSchema(job, schema);

job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setReducerClass(DeltaCaptureReducerMultiPaths.class);
AvroJob.setOutputKeySchema(job, schema);
job.setOutputKeyClass(AvroKey.class);

return (job.waitForCompletion(true) ? 0 : 1);

映射器代码

public class DeltaCaptureMapperMultiPaths extends Mapper<AvroKey<GenericData.Record>, NullWritable, Text , AvroValue<GenericData.Record>> 

    private static final Logger LOG = Logger.getLogger(DeltaCaptureMapperMultiPaths.class);

    @Override
    public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException 
        try 
            System.out.println("Specific Record - " + key);
            System.out.println("Datum :: " + key.datum());
            System.out.println("Schema :: " + key.datum().getSchema());
            AvroValue<GenericData.Record> outValue = new AvroValue<GenericData.Record>(key.datum());
            System.out.println("Generic Record (out) - " + key.datum());
            context.write(new Text(key.datum().get("id") +""), outValue);
         catch (Exception e) 
            e.printStackTrace();
        
    

Reducer 代码

public class DeltaCaptureReducerMultiPaths extends Reducer<Text, AvroValue<GenericData.Record>, AvroKey<GenericData.Record>, NullWritable> 

    @Override
    public void reduce(Text  key, Iterable<AvroValue<GenericData.Record>> values, Context context) throws IOException, InterruptedException 
        for(AvroValue<GenericData.Record> value : values) 
            AvroKey<GenericData.Record> outKey = new AvroKey<GenericData.Record>(value.datum());
            context.write(outKey, NullWritable.get());
        
    

假设 MR 输出到 /etl/out。现在下面的猪脚本因我在开头描述的错误而失败。

a= LOAD '/etl/out' USING org.apache.pig.builtin.AvroStorage('hdfs:///etl/test.avsc')
b = FOREACH a GENERATE $0,$1,$2,$3,$4;

hdfs:///etl/test.avsc 有 5 个字段(第 5 个是新字段)。

【问题讨论】:

如果你在猪里做dump a,那会是什么样子? dump a 给出 4 个字段的值。理想情况下,我希望 5 个字段,第 5 个字段为空,因为 avsc 中有 5 个字段。 describe a 也只给出 4 个字段。看起来它没有采用 AvroStorage 构造函数中传递的 avsc !! 【参考方案1】:

正如怀疑的那样,问题实际上在于 AvroStorage 的使用方式。用法是:a= LOAD '/etl/out' USING org.apache.pig.builtin.AvroStorage('test','-schemafile file:///etl/test.avsc') ,假设 test.avsc 在本地文件系统中。将 avsc 保存在 hdfs 中也应该有效,但我没有进行测试。我不知道为什么没有关于这些的明确文档!!!!

【讨论】:

找到一篇不错的文章,其中包含更多信息:michael-noll.com/blog/2013/07/04/… 确认 HDFS 位置也可以正常工作。还解释了no-schema-check 选项。

以上是关于Avro Schema Evolution with GenericData.Record - Mapreduce 过程的主要内容,如果未能解决你的问题,请参考以下文章

序列化avro schema

json CWAAS请求的Avro Schema

AVRO schema 的使用方法

Avro Schema Registry 的价值是啥?

如何在 Spark 中将 Avro Schema 对象转换为 StructType

用Java来测试Avro数据格式在Kafka的传输,及测试Avro Schema的兼容性