使用 GCP Cloud DataFlow 读取 BigTable 并转换为通用记录
Posted
技术标签:
【中文标题】使用 GCP Cloud DataFlow 读取 BigTable 并转换为通用记录【英文标题】:Reading BigTable and converting to Generic Records using GCP Cloud DataFlow 【发布时间】:2019-01-07 20:10:58 【问题描述】:我正在尝试使用数据流将 BigTable 表数据转换为通用记录。转换完成后,我必须与存储桶中的另一个数据集进行比较。 下面是我的伪代码,我用过的管道
pipeline
.apply("Read from bigtable", BigTableIo.read)
.apply("Transform BigTable to Avro Genric Records ",
ParDo.of(new TransformAvro(out.toString())))
.apply("Compare to existing avro file ")
.apply("Write back the data to bigTable")
// Function code is below to convert genric record
public class BigTableToAvroFunction
extends DoFn<KV<ByteString, Iterable<Mutation>>, GenericRecord>
@ProcessElement
public void processelement(ProcessContext context)
GenericRecord gen = null ;
ByteString key = context.element().getKey();
Iterable<Mutation> value = context.element().getValue();
KV<ByteString, Iterable<Mutation>> element = context.element();
我被困在这里了。
【问题讨论】:
我不认为GenericRecord 是您想要在这里转换的内容。如果您已经有 Avro 文件,您可以从它们生成一个 Java 类,然后将 HBase Result 转换为该类,或者创建自己的记录类进行比较。 【参考方案1】:目前尚不清楚与存储桶中的现有数据进行比较是什么意思。这取决于您要如何进行比较,文件大小是多少,可能还有其他事情。输入与输出的示例会有所帮助。
例如,如果您要执行的操作类似于 Join 操作,您可以尝试使用 CoGroupByKey
(link to the doc) 连接两个 PCollections
,一个读取 BigTable,另一个读取 Avros from GCS。
或者,如果文件大小合理(适合内存),您可以将其建模为侧输入 (link to the doc)。
或者,最终您始终可以使用原始 GCS API 来查询 ParDo
中的数据并手动执行所有操作。
【讨论】:
我正在尝试编写一个从 bigtable 读取数据并将 ByteString 转换为通用记录的函数。 如何在 pcollections 中加入大表和 avro,因为在读取大表后它给出了不同的数据类型,而 avro 是通用记录。任何示例都会非常有用。以上是关于使用 GCP Cloud DataFlow 读取 BigTable 并转换为通用记录的主要内容,如果未能解决你的问题,请参考以下文章
GCP Dataflow- 从存储中读取 CSV 文件并写入 BigQuery
Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)