BigQuery writeTableRows 始终写入缓冲区
Posted
技术标签:
【中文标题】BigQuery writeTableRows 始终写入缓冲区【英文标题】:BigQuery writeTableRows Always writing to buffer 【发布时间】:2018-04-19 07:34:34 【问题描述】:我们正在尝试使用 Apache Beam 和 avro 写入 Big Query。
以下似乎工作正常:-
p.apply("Input", AvroIO.read(DataStructure.class).from("AvroSampleFile.avro"))
.apply("Transform", ParDo.of(new CustomTransformFunction()))
.apply("Load", BigQueryIO.writeTableRows().to(table).withSchema(schema));
然后我们尝试通过以下方式使用它从 Google Pub/Sub 获取数据
p.begin()
.apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
.apply("Transform", ParDo.of(new CustomTransformFunction()))
.apply("Write", BigQueryIO.writeTableRows()
.to(table)
.withSchema(schema)
.withTimePartitioning(timePartitioning)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
当我们这样做时,它总是将它推送到缓冲区,Big Query 似乎需要很长时间才能从缓冲区中读取。谁能告诉我为什么上面不会将记录直接写入 Big Query 表?
更新:- 看起来我需要添加以下设置,但这会引发 java.lang.IllegalArgumentException。
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
【问题讨论】:
【参考方案1】:答案是您需要像这样包含“withNumFileShards”(可以是 1 到 1000)。
p.begin()
.apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
.apply("Transform", ParDo.of(new CustomTransformFunction()))
.apply("Write", BigQueryIO.writeTableRows()
.to(table)
.withSchema(schema)
.withTimePartitioning(timePartitioning)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
.withNumFileShards(1000)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
我在任何地方都找不到这说明 withNumFileShards 是强制性的,但是我在修复后找到了一个 Jira 票证。
https://issues.apache.org/jira/browse/BEAM-3198
【讨论】:
以上是关于BigQuery writeTableRows 始终写入缓冲区的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?