在 Java 中将 protobuf 转换为 bigquery
Posted
技术标签:
【中文标题】在 Java 中将 protobuf 转换为 bigquery【英文标题】:Converting protobuf to bigquery in Java 【发布时间】:2020-11-19 00:33:37 【问题描述】:我们将 protobuf 与 GCP 的 pubsub 和数据流一起使用。我们使用单个 proto 文件定义发送到 pubsub 的数据和 bigquery 模式。
publisher -(send proto)-> pubsub -> dataflow -(write)-> bigquery
有时数据流会进行一些外观更改,但主要是将字段从 protobuf 复制到 bigquery。
我的问题是,有没有办法自动将 protobuf 模型转换为 bigquery 的 TableRow?
我们现在拥有的简化数据流代码如下。我想消除ProtoToTableRow
类中的大部分代码:
public class MyPipeline
public static void main(String[] args)
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow>
@ProcessElement
public void processElement(ProcessContext c)
Core.Foo foo = c.element().getFoo();
TableRow fooRow = new TableRow()
.set("id", foo.getId())
.set("bar", foo.getBar())
.set("baz", foo.getBaz());
// similar code repeated for 100s of lines
TableRow row = new TableRow()
.set("foo", foo)
c.output(row);
【问题讨论】:
【参考方案1】:您可以通过一种非常酷的方式完成此操作。 Beam 为各种类(包括 Java Bean、AutoValue 类以及 Protocol Buffers)提供了模式推断方法。
对于您的管道,您不需要转换为 TableRow,您可以执行以下操作:
pipeline.getSchemaRegistry().registerSchemaProvider(
Core.MyProtoObject.class, new ProtoMessageSchema());
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("WriteToBigQuery", BigQueryIO.<Core.MyProtoObject>write()
. useBeamSchema()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
注意BigQueryIO.write
中的useBeamSchema
参数 - 这将使用自动转换。
【讨论】:
请注意,protobuf 模式提供程序不是核心 beam 的一部分,它是 beam-sdks-java-extensions-protobuf 的一部分:mvnrepository.com/artifact/org.apache.beam/… 似乎 useBeamRows 不存在,所以我需要将其更改为 useBeamSchema。之后我收到类型错误“原因:不存在类型变量的实例,因此 PCollectiondatahem 团队使用 protobuf 注释创建动态 BigQury schmea
https://github.com/mhlabs/datahem.processor/tree/master/generic/src/main/java/org/datahem/processor/generic
参考:https://robertsahlin.com/schema-evolution-in-streaming-dataflow-jobs-and-bigquery-tables-part-1/
【讨论】:
以上是关于在 Java 中将 protobuf 转换为 bigquery的主要内容,如果未能解决你的问题,请参考以下文章