在 Java 中将 protobuf 转换为 bigquery

Posted

技术标签:

【中文标题】在 Java 中将 protobuf 转换为 bigquery【英文标题】:Converting protobuf to bigquery in Java 【发布时间】:2020-11-19 00:33:37 【问题描述】:

我们将 protobuf 与 GCP 的 pubsub 和数据流一起使用。我们使用单个 proto 文件定义发送到 pubsub 的数据和 bigquery 模式。

publisher -(send proto)-> pubsub -> dataflow -(write)-> bigquery

有时数据流会进行一些外观更改,但主要是将字段从 protobuf 复制到 bigquery。

我的问题是,有没有办法自动将 protobuf 模型转换为 bigquery 的 TableRow?

我们现在拥有的简化数据流代码如下。我想消除ProtoToTableRow 类中的大部分代码:

public class MyPipeline 
    public static void main(String[] args) 
        events = pipeline.apply("ReadEvents",
                PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
        events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
                .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .withExtendedErrorInfo()
                        .to(table));
    


// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> 

    @ProcessElement
    public void processElement(ProcessContext c) 
        Core.Foo foo = c.element().getFoo();
        TableRow fooRow = new TableRow()
                .set("id", foo.getId())
                .set("bar", foo.getBar())
                .set("baz", foo.getBaz());

        // similar code repeated for 100s of lines

        TableRow row = new TableRow()
                .set("foo", foo)

        c.output(row);
    

【问题讨论】:

【参考方案1】:

您可以通过一种非常酷的方式完成此操作。 Beam 为各种类(包括 Java Bean、AutoValue 类以及 Protocol Buffers)提供了模式推断方法。

对于您的管道,您不需要转换为 TableRow,您可以执行以下操作:

pipeline.getSchemaRegistry().registerSchemaProvider(
    Core.MyProtoObject.class, new ProtoMessageSchema());

events = pipeline.apply("ReadEvents",
                PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));

events.apply("WriteToBigQuery", BigQueryIO.<Core.MyProtoObject>write()
                        . useBeamSchema()
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .withExtendedErrorInfo()
                        .to(table));

注意BigQueryIO.write 中的useBeamSchema 参数 - 这将使用自动转换。

【讨论】:

请注意,protobuf 模式提供程序不是核心 beam 的一部分,它是 beam-sdks-java-extensions-protobuf 的一部分:mvnrepository.com/artifact/org.apache.beam/… 似乎 useBeamRows 不存在,所以我需要将其更改为 useBeamSchema。之后我收到类型错误“原因:不存在类型变量的实例,因此 PCollection 符合 PCollection”(事务是我的原型的名称)。有什么想法吗? 回答我自己的问题 - 我需要像 BigQuery.write 一样向 BigQuery.write 提供类型信息。我会编辑答案 嗯实际上它仍然无法正常工作。现在我从这一行得到运行时错误 checkArgument(input.hasSchema());即使我认为我通过 pipeline.getSchemaRegistry().registerSchemaProvider(Core.MyProtoObject.class, new ProtoMessageSchema());【参考方案2】:

datahem 团队使用 protobuf 注释创建动态 BigQury schmea

https://github.com/mhlabs/datahem.processor/tree/master/generic/src/main/java/org/datahem/processor/generic

参考:https://robertsahlin.com/schema-evolution-in-streaming-dataflow-jobs-and-bigquery-tables-part-1/

【讨论】:

以上是关于在 Java 中将 protobuf 转换为 bigquery的主要内容,如果未能解决你的问题,请参考以下文章

Java:JSON -> Protobuf & 反向转换

在 Dart 中将 Map 的所有键和值转换为字符串

如何在Java中将String转换为long?

在 Java 中将 2e+08 转换为整数

在Java中将String转换为double

在java中将bmp转换为jpg