如何调用按需bigquery数据传输服务?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何调用按需bigquery数据传输服务?相关的知识,希望对你有一定的参考价值。

我真的很喜欢BigQuery的数据传输服务。我在要加载到BQ的确切模式中有平面文件。仅仅设置DTS调度表来挑选匹配模式的GCS文件并将其加载到BQ中真是太棒了。我喜欢内置选项,可以在复制和发送电子邮件后在遇到麻烦时删除源文件。但是最大的遗憾是最小间隔为60分钟。这很疯狂。我本来可以住得晚10分钟。

因此,如果我将DTS设置为按需设置,如何从API调用它?我正在考虑创建一个cronjob,每10分钟调用一次。但我无法通过文档弄清楚如何调用它。

此外,将GCS文件(不需要ETL)移到与确切模式匹配的bq表中的第二最好,最可靠,最便宜的方法是什么。我应该使用Cloud Scheduler,Cloud Functions,DataFlow,Cloud Run等吗?

如果使用Cloud Function,如何在调用时将我的GCS中的所有文件作为一个bq加载作业提交?

最后,有人知道DTS将来是否会将限制降低到10分钟?

答案

因此,如果我将DTS设置为按需设置,如何从API调用它?我正在考虑创建一个cronjob,每10分钟调用一次。但是我无法通过文档弄清楚如何调用它。

StartManualTransferRunsRPC library的一部分,但截至目前还没有等效的REST API。如何使用将取决于您的环境。例如,您可以使用Python客户端库(docs)。

作为示例,我使用了以下代码(您需要使用pip install google-cloud-bigquery-datatransfer作为依赖关系:]]

import time

from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp


client = bigquery_datatransfer_v1.DataTransferServiceClient()

PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = '5e6...7bc'  # alphanumeric ID you'll find in the UI 

parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)

start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))

response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)

请注意,您需要使用正确的传输配置ID,并且requested_run_time的类型必须为bigquery_datatransfer_v1.types.Timestamp(文档中没有示例)。我将开始时间设置为比当前执行时间早10秒。

您应该得到诸如以下的回复:

runs 
  name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
  destination_dataset_id: "DATASET_NAME"
  schedule_time 
    seconds: 1579358571
    nanos: 922599371
  
  ...
  data_source_id: "google_cloud_storage"
  state: PENDING
  params 
    ...
  
  run_time 
    seconds: 1579358581
  
  user_id: 28...65

并且转移已按预期触发(请注意错误):

enter image description here

此外,将GCS文件(不需要ETL)移到与确切模式匹配的bq表中的第二最好,最可靠,最便宜的方法是什么。我应该使用Cloud Scheduler,Cloud Functions,DataFlow,Cloud Run等吗?

这样,您可以设置一个cron作业,每十分钟执行一次功能。当然,这不是很可靠,在这里您可以跟进您的后续问题。

我认为这些范围可能太广,无法在单个StackOverflow问题中解决,但是我想说,对于您的用例,Cloud Scheduler + Cloud Functions / Cloud Run可以很好地工作。

如果需要ETL,最好是数据流,但是它具有一个GCS连接器,可以监视文件模式(example)。这样,您将跳过传输,设置监视间隔和加载作业触发频率以将文件写入BigQuery。与以前的方法相反,虚拟机将在流水线管道中持续运行。

如果您有复杂的工作流程/依赖性,Airflow最近引入了operators以开始手动运行。

如果使用Cloud Function,如何在调用时将我的GCS中的所有文件作为一个bq加载作业提交?

] >>

创建转移时,您可以使用wildcards

enter image description here

最后,有人知道DTS将来是否会将限制降低到10分钟吗?

[C0已经有一个功能请求。随时star表示您的兴趣并接收更新

以上是关于如何调用按需bigquery数据传输服务?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Google Bigquery 中触发按需计划查询

如何在视图或计划查询之间进行选择,以对通过 Stitch 导入的 BigQuery 表进行重复数据删除?

如何将 API 调用返回的数据直接加载到 BigQuery 中而不存储在 GCS 中?

如何使用 Python + 服务帐户创建 BigQuery 数据传输?

BigQuery,Python 批量插入 bigquery 以进行流式传输服务(“告诉”错误)

如何使用API 而不是使用Google BigQuery数据传输服务?