使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表
Posted
技术标签:
【中文标题】使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表【英文标题】:Using apache beam JsonTimePartitioning to create time partitioned tables in bigqiery 【发布时间】:2019-07-19 12:52:18 【问题描述】:我尝试使用 apache beam JAVA sdk 中的 JsonTimePartitioning 类将数据写入 bigquery 中的动态表,但我得到 JsonTimePartitioning 类的“找不到符号”。
这就是我尝试导入课程的方式
import com.google.api.services.bigquery.model.JsonTimePartitioning;
这就是我尝试在管道中使用它的方式
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withJsonTimePartitioningTo(new JsonTimePartitioning().setType("DAY")));
【问题讨论】:
【参考方案1】:我似乎在任何地方都找不到JsonTimePartitioning
。你能指出一个你试图效仿的例子吗? BigQueryIO
上的现有方法要么接受 TimePartiotioning
的实例,要么接受实际上是同一 TimePartitioning
的 JSON 序列化实例的 value-provider for a String
。事实上,当调用该方法的TimePartitioning
版本时,您最终仍然只是serializing it into string internally:。您可以找到它的使用示例here:
将历史数据加载到按时间分区的 BigQuery 表中 将历史数据放入按时间分区的 BigQuery 表中,指定
BigQueryIO.Write.withTimePartitioning(com.google.api.services.bigquery.model.TimePartitioning)
具有用于基于列的分区的字段。例如:PCollection<Quote> quotes = ...; quotes.apply(BigQueryIO.write() .withSchema(schema) .withFormatFunction(quote -> new TableRow() .set("timestamp", quote.getTimestamp()) .set(..other columns..)) .to("my-project:my_dataset.my_table") .withTimePartitioning(new TimePartitioning().setField("time"))); ```
【讨论】:
嗨 Anton,我尝试使用您的解决方案,但我想这不适用于动态目的地。我通过将 tablepartitioning 参数添加到 TableDestination 来解决它,而不是使用 BigQueryIO.Write 如下TableDestination(destination, null, new TimePartitioning().setField("CheckInDate"))
以上是关于使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表
使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误
如何在 Apache Beam 中解析 2 个 json 文件
如何在Apache Beam / Google Dataflow中使用ParseJsons?
如何过滤坏和好的 json 事件,然后增加坏 json 记录的指标计数,并使用 java [关闭] 将这些记录存储在 apache Beam 中