BigQuery writeTableRows 始终写入缓冲区

Posted

技术标签:

【中文标题】BigQuery writeTableRows 始终写入缓冲区【英文标题】:BigQuery writeTableRows Always writing to buffer 【发布时间】:2018-04-19 07:34:34 【问题描述】:

我们正在尝试使用 Apache Beam 和 avro 写入 Big Query。

以下似乎工作正常:-

p.apply("Input", AvroIO.read(DataStructure.class).from("AvroSampleFile.avro"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Load", BigQueryIO.writeTableRows().to(table).withSchema(schema));

然后我们尝试通过以下方式使用它从 Google Pub/Sub 获取数据

p.begin()
            .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withTimePartitioning(timePartitioning)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

当我们这样做时,它总是将它推送到缓冲区,Big Query 似乎需要很长时间才能从缓冲区中读取。谁能告诉我为什么上面不会将记录直接写入 Big Query 表?

更新:- 看起来我需要添加以下设置,但这会引发 java.lang.IllegalArgumentException。

.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))

【问题讨论】:

【参考方案1】:

答案是您需要像这样包含“withNumFileShards”(可以是 1 到 1000)。

        p.begin()
            .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withTimePartitioning(timePartitioning)
            .withMethod(Method.FILE_LOADS)
            .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
            .withNumFileShards(1000)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

我在任何地方都找不到这说明 withNumFileShards 是强制性的,但是我在修复后找到了一个 Jira 票证。

https://issues.apache.org/jira/browse/BEAM-3198

【讨论】:

以上是关于BigQuery writeTableRows 始终写入缓冲区的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?

表行中的 BigqueryIO 架构

開始学习swift开发

雷电源代码分析-- 进入游戏開始界面

怎样開始学习ADF和Jdeveroper 11g

怎样開始学习ADF和Jdeveroper 11g