从 Dataflow python 作业写入 bigquery 中的分区表
Posted
技术标签:
【中文标题】从 Dataflow python 作业写入 bigquery 中的分区表【英文标题】:Writing to partitioned table in bigquery from Dataflow python job 【发布时间】:2018-11-27 17:01:47 【问题描述】:当我从数据流写入 bigquery 中的分区表时,我收到以下错误 - 谁能帮我解决这个问题?
无效的表 ID \"test$20181126\"。表 ID 必须是字母数字(加上下划线)并且长度不得超过 1024 个字符。此外,不能使用表格装饰器。
这是我用来写的python sn-p
import apache_beam as beam
class bqwriter(beam.PTransform):
def __init__(self, table, schema):
super(BQWriter, self).__init__()
self.table = table
self.schema = schema
def expand(self, pcoll):
pcoll | beam.io.Write(beam.io.BigQuerySink(
self.table,
schema=self.schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
))
我正在创建像下面这样的标签
a | 'BQWrite' >> BQWriter("test-123:test.test$20181126", table_schema)
【问题讨论】:
【参考方案1】:我遇到了同样的问题。我的解决方案是:
要么在数据中添加日期列,然后设置要在其上分区的 BQ 表
或者在 BQ 的 _PARTITIONTIME 上设置默认分区。
这两个选项都意味着您只插入到 test-123:test.test
至于我们是否应该能够做你想做的事,似乎是的。 Beam JIRA 声明他们为 Java 修复了它,但我找不到 python 的状态。
【讨论】:
那我怎样才能让它每天截断和加载 我不确定这是否可行。您是否测试过仅写入您的表并假设 BQ 将通过 _PARTITIONTIME / 自定义分区列完成工作?根据您的用例,您最好安排加载作业/查询【参考方案2】:最好的方法是将函数传递给原生 beam.io.WriteToBigQuery
类:
def table_fn(element):
current_date = date.fromtimestamp(element['timestamp']).strftime("%Y%m%d")
return f"bq_output_table$current_date"
user_parent_user_watchever_pcol | "Write to BigQuery" >>
beam.io.Write(
beam.io.WriteToBigQuery(
table_fn,
schema=schemas.VIDEOCATALOG_SCHEMA,
method="STREAMING_INSERTS",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
)
)
【讨论】:
以上是关于从 Dataflow python 作业写入 bigquery 中的分区表的主要内容,如果未能解决你的问题,请参考以下文章
从 PubSub 导出到 BigQuery - Dataflow 没有任何反应