从 Dataflow python 作业写入 bigquery 中的分区表

Posted

技术标签:

【中文标题】从 Dataflow python 作业写入 bigquery 中的分区表【英文标题】:Writing to partitioned table in bigquery from Dataflow python job 【发布时间】:2018-11-27 17:01:47 【问题描述】:

当我从数据流写入 bigquery 中的分区表时,我收到以下错误 - 谁能帮我解决这个问题?

无效的表 ID \"test$20181126\"。表 ID 必须是字母数字(加上下划线)并且长度不得超过 1024 个字符。此外,不能使用表格装饰器。

这是我用来写的python sn-p

import apache_beam as beam


class bqwriter(beam.PTransform):
    def __init__(self, table, schema):
        super(BQWriter, self).__init__()
        self.table = table
        self.schema = schema

    def expand(self, pcoll):
        pcoll | beam.io.Write(beam.io.BigQuerySink(
            self.table,
            schema=self.schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        ))

我正在创建像下面这样的标签

a | 'BQWrite' >> BQWriter("test-123:test.test$20181126", table_schema)

【问题讨论】:

【参考方案1】:

我遇到了同样的问题。我的解决方案是:

要么在数据中添加日期列,然后设置要在其上分区的 BQ 表

或者在 BQ 的 _PARTITIONTIME 上设置默认分区。

这两个选项都意味着您只插入到 test-123:test.test

至于我们是否应该能够做你想做的事,似乎是的。 Beam JIRA 声明他们为 Java 修复了它,但我找不到 python 的状态。

【讨论】:

那我怎样才能让它每天截断和加载 我不确定这是否可行。您是否测试过仅写入您的表并假设 BQ 将通过 _PARTITIONTIME / 自定义分区列完成工作?根据您的用例,您最好安排加载作业/查询【参考方案2】:

最好的方法是将函数传递给原生 beam.io.WriteToBigQuery 类:

def table_fn(element):
    current_date = date.fromtimestamp(element['timestamp']).strftime("%Y%m%d")
    return f"bq_output_table$current_date"

user_parent_user_watchever_pcol | "Write to BigQuery" >> 
beam.io.Write(
    beam.io.WriteToBigQuery(
        table_fn,
        schema=schemas.VIDEOCATALOG_SCHEMA,
        method="STREAMING_INSERTS",
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
    )
)

【讨论】:

以上是关于从 Dataflow python 作业写入 bigquery 中的分区表的主要内容,如果未能解决你的问题,请参考以下文章

使用 DataFlow 作业加载分区表

从 PubSub 导出到 BigQuery - Dataflow 没有任何反应

如何使用 python 将字典写入 Dataflow 中的 Bigquery

在Dataflow作业中查找重复的数据 - Python

从 REST API 更新 Dataflow 作业

从 Dataflow 作业连接到 Cloud SQL