Beam - 读取 AVRO 并转换

Posted

技术标签:

【中文标题】Beam - 读取 AVRO 并转换【英文标题】:Beam - Read AVRO and transform 【发布时间】:2018-12-12 00:06:22 【问题描述】:

我必须从 Cloud Storage 读取一个 AVRO 文件,然后将记录写入一个带有行键的大表中,并将 AVRO 作为列单元格中的字节。我正在使用 AVROIO.read 将数据读取为 GenericRecord 。 . 如何应用 pardo 函数将数据转换为可以写入 bigtable 的内容

// Read AVRO from GCS

pipeline
  .apply("Read from Avro",
    AvroIO
       .readGenericRecords(schema)
       .from(options.getInputFilePattern()))

//.apply - pardo transformation 

.apply("Write to Bigtable", write);

非常感谢您对管道第二步的任何帮助

更新:

感谢 Anton 的快速帮助,我现在明白我必须做什么,并想出了下面的 pardo

 pipeline
   .apply("Read from Avro",
               AvroIO
                 .readGenericRecords(schema)
                 .from(options.getInputFilePattern()))
   .apply(ParDo.of(new DoFn<GenericRecord,  Iterable<Mutation> >() 
       @ProcessElement
       public void processElement(ProcessContext c) 
            GenericRecord gen = c.element();
            byte[] fieldNameByte = null;
            byte[] fieldValueByte = null;

            // ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
            for (Schema.Field field : fields) 

                try 
                   String fieldName = field.name();
                   fieldNameByte = fieldName.getBytes("UTF-8");
                   String value = String.valueOf(gen.get(fieldName));
                   fieldValueByte = value.getBytes("UTF-8");
                 catch (Exception e) 
                   e.printStackTrace();
                

                Iterable<Mutation> mutations =
                  ImmutableList.of(
                     Mutation.newBuilder()
                         .setSetCell(
                           Mutation.SetCell.newBuilder()
                              .setValue(
                                   ByteString.copyFrom(fieldValueByte))
                               .setFamilyName(COLUMN_FAMILY_NAME))
                         .build());
                c.output(,mutations));
              
          
       ))
   .apply("Write to Bigtable", write);
 return pipeline.run();

这只是一个伪代码,我只是在学习和尝试.. 我需要帮助来将突变添加到 ProcessContext 并进行写入.. 请看一下,让我知道我是否在正确的方向以及如何将突变添加到上下文中

【问题讨论】:

【参考方案1】:

类似的东西:

Pipeline p = Pipeline.create(options);
p.apply(GenerateSequence.from(0).to(numRows))
 .apply(
     ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() 
         @ProcessElement
         public void processElement(ProcessContext c) 
             int index = c.element().intValue();

             Iterable<Mutation> mutations =
                ImmutableList.of(
                   Mutation.newBuilder()
                           .setSetCell(Mutation.SetCell.newBuilder()
                           .setValue(testData.get(index).getValue())
                           .setFamilyName(COLUMN_FAMILY_NAME))
                           .build());
             c.output(KV.of(testData.get(index).getKey(), mutations));
         
     ))
 .apply(
    BigtableIO
      .write()
      .withBigtableOptions(bigtableOptions)
      .withTableId(tableId));

复制自Bigtable integration test。

一般还有here is Beam doc on ParDo,here's javadoc for BigtableIO,它有一些解释。

【讨论】:

感谢 Anton .. 我查看了示例,但我仍然对如何迭代 AVRO 通用记录并将值转换为可以写入 BigTable 的突变感到困惑。 public void processElement(ProcessContext ctx) GenericRecord genericRecord = ctx.element(); Schema 模式 = new Schema.Parser().parse(schemaJson);我需要一些帮助来理解将 genericRecord 转换为可以插入 BigTable 列的突变(从 avro 记录中提取字节) 我不确定我是否完全理解。要从通用记录中获取值,请使用genericRecord.get("field_name"),它会为您提供一个对象。然后您必须根据您在 BigTable 中存储的内容将其转换为字节字符串。那部分是你的业务逻辑,你自己决定你希望你的对象如何被序列化。如果适合您的用例,您可以尝试使用其他人使用的辅助类,例如:github.com/apache/beam/blob/… 如果您对如何序列化对象有疑问,您应该阅读该主题,例如这是一个如何将对象转换为字节数组的示例:***.com/questions/2836646/…(这是一个示例,您可以选择对象的表示方式)。如果需要,您可以使用ByteStrings.copyFrom(byteArray) 安东,我用更新编辑了我原来的问题..请看看并提供反馈 您的方法总体上是有意义的。下一步取决于您的 BigTable 应该是什么样子。不过要注意的一件事是,在您的示例中,对于输入 Avro 对象的每个字段,您都会发出一个单独的单元格突变 - c.output(,mutations),不确定这是否是您想要的。另一件事是您需要从 ParDo 发出一个键值对、一个字节字符串键和一组突变值,因此您必须选择一个键(例如,输入 Avro 对象的一个​​字段) ,并构造一个类似于我的示例的KV

以上是关于Beam - 读取 AVRO 并转换的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam GCP 在动态创建的目录中上传 Avro

Apache-beam Bigquery .fromQuery ClassCastException

0016-Avro序列化&反序列化和Spark读取Avro数据

有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?

Apache Beam 处理文件

在 Spark 中读取 Avro 文件