使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表

Posted

技术标签:

【中文标题】使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表【英文标题】:Using apache beam JsonTimePartitioning to create time partitioned tables in bigqiery 【发布时间】:2019-07-19 12:52:18 【问题描述】:

我尝试使用 apache beam JAVA sdk 中的 JsonTimePartitioning 类将数据写入 bigquery 中的动态表,但我得到 JsonTimePartitioning 类的“找不到符号”。

这就是我尝试导入课程的方式

import com.google.api.services.bigquery.model.JsonTimePartitioning;

这就是我尝试在管道中使用它的方式

  .withWriteDisposition(WriteDisposition.WRITE_APPEND)
  .withJsonTimePartitioningTo(new JsonTimePartitioning().setType("DAY")));

【问题讨论】:

【参考方案1】:

我似乎在任何地方都找不到JsonTimePartitioning。你能指出一个你试图效仿的例子吗? BigQueryIO 上的现有方法要么接受 TimePartiotioning 的实例,要么接受实际上是同一 TimePartitioning 的 JSON 序列化实例的 value-provider for a String。事实上,当调用该方法的TimePartitioning 版本时,您最终仍然只是serializing it into string internally:。您可以找到它的使用示例here:

将历史数据加载到按时间分区的 BigQuery 表中 将历史数据放入按时间分区的 BigQuery 表中,指定 BigQueryIO.Write.withTimePartitioning(com.google.api.services.bigquery.model.TimePartitioning) 具有用于基于列的分区的字段。例如:

 PCollection<Quote> quotes = ...;

 quotes.apply(BigQueryIO.write()
         .withSchema(schema)
         .withFormatFunction(quote -> new TableRow()
            .set("timestamp", quote.getTimestamp())
            .set(..other columns..))
         .to("my-project:my_dataset.my_table")
         .withTimePartitioning(new TimePartitioning().setField("time"))); ```

【讨论】:

嗨 Anton,我尝试使用您的解决方案,但我想这不适用于动态目的地。我通过将 tablepartitioning 参数添加到 TableDestination 来解决它,而不是使用 BigQueryIO.Write 如下TableDestination(destination, null, new TimePartitioning().setField("CheckInDate"))

以上是关于使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误

如何在 Apache Beam 中解析 2 个 json 文件

如何在Apache Beam / Google Dataflow中使用ParseJsons?

如何过滤坏和好的 json 事件,然后增加坏 json 记录的指标计数,并使用 java [关闭] 将这些记录存储在 apache Beam 中

Apache Beam 批量到 BigQuery,中间文件,它们是不是仅以 JSON 格式生成