使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表
Posted
技术标签:
【中文标题】使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表【英文标题】:Write to a datepartitioned Bigquery table using the beam.io.gcp.bigquery.WriteToBigQuery module in apache beam 【发布时间】:2019-07-09 15:35:58 【问题描述】:我正在尝试编写一个数据流作业,该作业需要处理位于存储中的日志并将它们写入不同的 BigQuery 表中。将使用哪些输出表取决于日志中的记录。因此,我对日志进行了一些处理,并使用基于日志中的值的键生成它们。之后,我将日志分组到密钥上。我需要将分组在同一个键上的所有日志写入一个表。
我正在尝试将 beam.io.gcp.bigquery.WriteToBigQuery
模块与可调用的模块一起用作documentation here 中所述的表参数
我想使用一个日期分区表,因为这样我可以很容易地在不同的分区上write_truncate
。
现在我遇到两个主要问题:
CREATE_IF_NEEDED
给出错误,因为它必须创建分区表。我可以通过确保表存在于上一步中来规避这种情况,如果不存在则创建它们。
如果我加载旧数据,我会收到以下错误:
The destination table's partition table_name_x$20190322 is outside the allowed bounds. You can only stream to partitions within 31 days in the past and 16 days in the future relative to the current date."
这似乎是流插入的限制,有什么方法可以进行批量插入?
也许我正在接近这个错误,应该使用另一种方法。 任何有关如何解决这些问题的指导表示赞赏。
我正在使用 python 3.5 和 apache-beam=2.13.0
【问题讨论】:
【参考方案1】:当混合使用摄取时间分区表和列分区表时,可能会记录该错误消息(参见类似的issue)。从链接总结,不能使用基于列的分区(不是ingestion-time partitioning)和写入带有分区后缀的表。
在您的情况下,由于您想根据日志中的值写入不同的表并在每个表中都有分区,因此在选择哪个表时放弃使用分区装饰器(使用“[prefix]_YYYYMMDD”)和然后让每个单独的表为column-based partitioned。
【讨论】:
我不确定我是如何混合这两种分区的。我的表是摄取时间分区的(伪列_PARTITIONTIME),我正在使用装饰器在指定的分区中加载数据。如果我保持在流插入的31 days in the past and 16 days in the future
限制内,这会很好。但据我了解,WriteToBigQuery
模块将使用批量插入 (FILE_LOADS) 方法,从而避免流时间限制。我也考虑过使用基于列的分区,但是在重新运行作业时很难避免重复。
您是否使用了“--experiments use_beam_bq_sink”标志?要启用在流插入和批量插入之间切换的功能,SDK 需要传入这个实验。编辑:我刚刚在***.com/questions/43505534/… 中发现了一个类似的问题。但是对于您的用例,启用该标志可能会解决它。以上是关于使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表的主要内容,如果未能解决你的问题,请参考以下文章
在 python Apache Beam 中打开一个 gzip 文件
使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS
ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询
如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表
作为数据流运行器运行时,apache beam.io.BigQuerySource use_standard_sql 不起作用