使用 GCP Cloud DataFlow 读取 BigTable 并转换为通用记录

Posted

技术标签:

【中文标题】使用 GCP Cloud DataFlow 读取 BigTable 并转换为通用记录【英文标题】:Reading BigTable and converting to Generic Records using GCP Cloud DataFlow 【发布时间】:2019-01-07 20:10:58 【问题描述】:

我正在尝试使用数据流将 BigTable 表数据转换为通用记录。转换完成后,我必须与存储桶中的另一个数据集进行比较。 下面是我的伪代码,我用过的管道

  pipeline
     .apply("Read from bigtable", BigTableIo.read)
     .apply("Transform BigTable to Avro Genric Records ",
         ParDo.of(new TransformAvro(out.toString())))
     .apply("Compare to existing avro file ")
     .apply("Write back the data to bigTable")

// Function code is below to convert genric record     

 public class BigTableToAvroFunction
    extends DoFn<KV<ByteString, Iterable<Mutation>>, GenericRecord>  
       @ProcessElement
       public void processelement(ProcessContext context)
         GenericRecord gen = null ;
         ByteString key = context.element().getKey();
         Iterable<Mutation> value  = context.element().getValue();
         KV<ByteString, Iterable<Mutation>> element = context.element(); 
  

我被困在这里了。

【问题讨论】:

我不认为GenericRecord 是您想要在这里转换的内容。如果您已经有 Avro 文件,您可以从它们生成一个 Java 类,然后将 HBase Result 转换为该类,或者创建自己的记录类进行比较。 【参考方案1】:

目前尚不清楚与存储桶中的现有数据进行比较是什么意思。这取决于您要如何进行比较,文件大小是多少,可能还有其他事情。输入与输出的示例会有所帮助。

例如,如果您要执行的操作类似于 Join 操作,您可以尝试使用 CoGroupByKey (link to the doc) 连接两个 PCollections,一个读取 BigTable,另一个读取 Avros from GCS。

或者,如果文件大小合理(适合内存),您可以将其建模为侧输入 (link to the doc)。

或者,最终您始终可以使用原始 GCS API 来查询 ParDo 中的数据并手动执行所有操作。

【讨论】:

我正在尝试编写一个从 bigtable 读取数据并将 ByteString 转换为通用记录的函数。 如何在 pcollections 中加入大表和 avro,因为在读取大表后它给出了不同的数据类型,而 avro 是通用记录。任何示例都会非常有用。

以上是关于使用 GCP Cloud DataFlow 读取 BigTable 并转换为通用记录的主要内容,如果未能解决你的问题,请参考以下文章

Dataflow API 不会在 GCP 中激活

GCP Dataflow- 从存储中读取 CSV 文件并写入 BigQuery

Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)

Dataflow SQL (GCP) 不支持使用 STRUCT 的嵌套行

Google Cloud Pub/Sub 中的积压工作

GCP Dataflow + Apache Beam - 缓存问题