BigQueryIO.write() 使用 SQL 函数
Posted
技术标签:
【中文标题】BigQueryIO.write() 使用 SQL 函数【英文标题】:BigQueryIO.write() use SQL functions 【发布时间】:2021-08-10 09:25:57 【问题描述】:我有一个 Dataflow 流式传输作业。我正在使用 BigqueryIO.write 库将行插入 BigQuery 表。 BQ表中有一个列,应该是存储行创建时间戳的。我需要使用 SQL 函数“CURRENT_TIMESTAMP()”来更新该列的值。
我无法使用任何 java 库(如 Instant.now())来获取当前时间戳。因为这将在作业执行期间得出该值。我正在使用 BigQuery 加载作业,其触发频率为 10 分钟。因此,如果我使用任何 java 库来派生时间戳,那么它将不会返回预期的输出。
我在 BigqueryIO.write 中找不到任何方法,该方法将任何 SQL 函数作为输入。那么这个问题有什么解决办法呢?
【问题讨论】:
使用行创建时间戳,您是指生成元素的那一刻吗?如果是这样,您可以对 DoFn 中的上下文使用.timestamp()
方法。这应该返回元素本身的时间戳。
@Iñigo,timestamp() 方法无济于事。因为它会在 Dataflow 作业执行期间尝试构建时间戳。但正如我在描述中提到的,我需要使用 BigqueryIO File Load 方法将数据插入 BQ 表。触发频率为 10 分钟,这意味着实际 BQ 插入将在 Dataflow 作业执行后 20 分钟(或更多根据数据量可能会分成多个批次)发生。
c.timestamp()
(c
是上下文)在执行期间不会尝试构建时间戳,但它将是元素“创建”的时间戳。例如,如果元素是从 PubSub 读取的消息,c.timestamp()
将是该消息的发布时间。无论如何,不确定这是否适用于您的情况。也许使用withFormatFunction
并在那里添加时间戳?
【参考方案1】:
听起来您希望 BigQuery 根据插入行的时间为每一行分配一个时间戳。我能想到的唯一方法是向 BigQuery 提交一个 QueryJob,其中包含一个 INSERT 语句,其中包括 CURRENT_TIMESTAMP()
以及其他列的值。但是这种方法对数据量的扩展性不是特别好,BigQueryIO.write()
也不支持。
BigQueryIO.write
支持批量加载、流式插入 API 和存储写入 API,据我所知,这些 API 都没有像您建议的那样提供注入 BigQuery 端时间戳的方法。
【讨论】:
是的,我考虑过使用 QueryJob。但是出于您解释的相同原因,我没有使用它。由于流式作业预计会有大量流量,因此性能问题是这里的一个问题。以上是关于BigQueryIO.write() 使用 SQL 函数的主要内容,如果未能解决你的问题,请参考以下文章
数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException
如何使用 dataflowsdk 将数据从 bigquery 转录到 bigquery?