如何调用按需bigquery数据传输服务?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何调用按需bigquery数据传输服务?相关的知识,希望对你有一定的参考价值。
我真的很喜欢BigQuery的数据传输服务。我在要加载到BQ的确切模式中有平面文件。仅仅设置DTS调度表来挑选匹配模式的GCS文件并将其加载到BQ中真是太棒了。我喜欢内置选项,可以在复制和发送电子邮件后在遇到麻烦时删除源文件。但是最大的遗憾是最小间隔为60分钟。这很疯狂。我本来可以住得晚10分钟。
因此,如果我将DTS设置为按需设置,如何从API调用它?我正在考虑创建一个cronjob,每10分钟调用一次。但我无法通过文档弄清楚如何调用它。
此外,将GCS文件(不需要ETL)移到与确切模式匹配的bq表中的第二最好,最可靠,最便宜的方法是什么。我应该使用Cloud Scheduler,Cloud Functions,DataFlow,Cloud Run等吗?
如果使用Cloud Function,如何在调用时将我的GCS中的所有文件作为一个bq加载作业提交?
最后,有人知道DTS将来是否会将限制降低到10分钟?
因此,如果我将DTS设置为按需设置,如何从API调用它?我正在考虑创建一个cronjob,每10分钟调用一次。但是我无法通过文档弄清楚如何调用它。
StartManualTransferRuns
是RPC library的一部分,但截至目前还没有等效的REST API。如何使用将取决于您的环境。例如,您可以使用Python客户端库(docs)。
作为示例,我使用了以下代码(您需要使用pip install google-cloud-bigquery-datatransfer
作为依赖关系:]]
import time from google.cloud import bigquery_datatransfer_v1 from google.protobuf.timestamp_pb2 import Timestamp client = bigquery_datatransfer_v1.DataTransferServiceClient() PROJECT_ID = 'PROJECT_ID' TRANSFER_CONFIG_ID = '5e6...7bc' # alphanumeric ID you'll find in the UI parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID) start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10)) response = client.start_manual_transfer_runs(parent, requested_run_time=start_time) print(response)
请注意,您需要使用正确的传输配置ID,并且
requested_run_time
的类型必须为bigquery_datatransfer_v1.types.Timestamp
(文档中没有示例)。我将开始时间设置为比当前执行时间早10秒。
您应该得到诸如以下的回复:
runs name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04" destination_dataset_id: "DATASET_NAME" schedule_time seconds: 1579358571 nanos: 922599371 ... data_source_id: "google_cloud_storage" state: PENDING params ... run_time seconds: 1579358581 user_id: 28...65
并且转移已按预期触发(请注意错误):
此外,将GCS文件(不需要ETL)移到与确切模式匹配的bq表中的第二最好,最可靠,最便宜的方法是什么。我应该使用Cloud Scheduler,Cloud Functions,DataFlow,Cloud Run等吗?
这样,您可以设置一个cron作业,每十分钟执行一次功能。当然,这不是很可靠,在这里您可以跟进您的后续问题。
我认为这些范围可能太广,无法在单个StackOverflow问题中解决,但是我想说,对于您的用例,Cloud Scheduler + Cloud Functions / Cloud Run可以很好地工作。
如果需要ETL,最好是数据流,但是它具有一个GCS连接器,可以监视文件模式(example)。这样,您将跳过传输,设置监视间隔和加载作业触发频率以将文件写入BigQuery。与以前的方法相反,虚拟机将在流水线管道中持续运行。
如果您有复杂的工作流程/依赖性,Airflow最近引入了operators以开始手动运行。
如果使用Cloud Function,如何在调用时将我的GCS中的所有文件作为一个bq加载作业提交?
] >>创建转移时,您可以使用wildcards:
最后,有人知道DTS将来是否会将限制降低到10分钟吗?
[C0已经有一个功能请求。随时star表示您的兴趣并接收更新
以上是关于如何调用按需bigquery数据传输服务?的主要内容,如果未能解决你的问题,请参考以下文章
如何在视图或计划查询之间进行选择,以对通过 Stitch 导入的 BigQuery 表进行重复数据删除?
如何将 API 调用返回的数据直接加载到 BigQuery 中而不存储在 GCS 中?
如何使用 Python + 服务帐户创建 BigQuery 数据传输?