BigQueryIO.write() 使用 SQL 函数

Posted

技术标签:

【中文标题】BigQueryIO.write() 使用 SQL 函数【英文标题】:BigQueryIO.write() use SQL functions 【发布时间】:2021-08-10 09:25:57 【问题描述】:

我有一个 Dataflow 流式传输作业。我正在使用 BigqueryIO.write 库将行插入 BigQuery 表。 BQ表中有一个列,应该是存储行创建时间戳的。我需要使用 SQL 函数“CURRENT_TIMESTAMP()”来更新该列的值。

我无法使用任何 java 库(如 Instant.now())来获取当前时间戳。因为这将在作业执行期间得出该值。我正在使用 BigQuery 加载作业,其触发频率为 10 分钟。因此,如果我使用任何 java 库来派生时间戳,那么它将不会返回预期的输出。

我在 BigqueryIO.write 中找不到任何方法,该方法将任何 SQL 函数作为输入。那么这个问题有什么解决办法呢?

【问题讨论】:

使用行创建时间戳,您是指生成元素的那一刻吗?如果是这样,您可以对 DoFn 中的上下文使用 .timestamp() 方法。这应该返回元素本身的时间戳。 @Iñigo,timestamp() 方法无济于事。因为它会在 Dataflow 作业执行期间尝试构建时间戳。但正如我在描述中提到的,我需要使用 BigqueryIO File Load 方法将数据插入 BQ 表。触发频率为 10 分钟,这意味着实际 BQ 插入将在 Dataflow 作业执行后 20 分钟(或更多根据数据量可能会分成多个批次)发生。 c.timestamp()c 是上下文)在执行期间不会尝试构建时间戳,但它将是元素“创建”的时间戳。例如,如果元素是从 PubSub 读取的消息,c.timestamp() 将是该消息的发布时间。无论如何,不​​确定这是否适用于您的情况。也许使用withFormatFunction 并在那里添加时间戳? 【参考方案1】:

听起来您希望 BigQuery 根据插入行的时间为每一行分配一个时间戳。我能想到的唯一方法是向 BigQuery 提交一个 QueryJob,其中包含一个 INSERT 语句,其中包括 CURRENT_TIMESTAMP() 以及其他列的值。但是这种方法对数据量的扩展性不是特别好,BigQueryIO.write() 也不支持。

BigQueryIO.write 支持批量加载、流式插入 API 和存储写入 API,据我所知,这些 API 都没有像您建议的那样提供注入 BigQuery 端时间戳的方法。

【讨论】:

是的,我考虑过使用 QueryJob。但是出于您解释的相同原因,我没有使用它。由于流式作业预计会有大量流量,因此性能问题是这里的一个问题。

以上是关于BigQueryIO.write() 使用 SQL 函数的主要内容,如果未能解决你的问题,请参考以下文章

数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException

如何使用 dataflowsdk 将数据从 bigquery 转录到 bigquery?

BigQuery 代码段中的错误

是否有任何形式可写入 BigQuery 以动态指定目标表的名称?

数据流 bigquery 单元测试

PL/SQL开发中动态SQL的使用方法