使用 Google.Cloud.BigQuery.V2 的 BigQuery 加载作业的幂等性

Posted

技术标签:

【中文标题】使用 Google.Cloud.BigQuery.V2 的 BigQuery 加载作业的幂等性【英文标题】:Idempotency for BigQuery load jobs using Google.Cloud.BigQuery.V2 【发布时间】:2018-05-14 10:07:29 【问题描述】:

您可以使用 Google.Cloud.BigQuery.V2 中具有 CreateLoadJob 方法的 BigQueryClient 创建 csv 加载作业,以从 Google Cloud Storage 中的 csv 文件加载数据。

您如何使用此 API 保证幂等性,以确保在收到响应之前说网络中断,并且您开始重试,您最终不会多次将相同的数据加载到 BigQuery 中?

API 使用示例

    private void LoadCsv(string sourceUri, string tableId, string timePartitionField)
    
        var tableReference = new TableReference()
        
            DatasetId = _dataSetId,
            ProjectId = _projectId,
            TableId = tableId
        ;

        var options = new CreateLoadJobOptions
        
            WriteDisposition = WriteDisposition.WriteAppend,
            CreateDisposition = CreateDisposition.CreateNever,
            SkipLeadingRows = 1,
            SourceFormat = FileFormat.Csv,
            TimePartitioning = new TimePartitioning
            
                Type = _partitionByDayType,
                Field = timePartitionField
            
        ;

        BigQueryJob loadJob = _bigQueryClient.CreateLoadJob(sourceUri: sourceUri,
                                                            destination: tableReference,
                                                            schema: null,
                                                            options: options);

        loadJob.PollUntilCompletedAsync().Wait();
        if (loadJob.Status.Errors == null || !loadJob.Status.Errors.Any())
        
            //Log success
            return;
        
        //Log error
    

【问题讨论】:

【参考方案1】:

有两个地方可能最终会丢失响应:

创建作业开始时 轮询完成时

第一个在没有作业 ID 的情况下相对难以恢复;您可以列出项目中的所有工作,并尝试找到一个与您原本要创建的工作相似的工作。

但是,C# 客户端库会生成一个作业 ID,以便 可以重试,或者您可以通过 CreateLoadJobOptions 指定您自己的作业 ID。

第二次失败时间要简单得多:保留返回的BigQueryJob,以便在失败时重试轮询。 (例如,您可以存储作业名称,以便即使您的进程在等待它完成时死亡,您也可以恢复。)

【讨论】:

【参考方案2】:

您可以通过基于例如生成自己的 jobid 来实现幂等性。您加载的文件位置和目标表。

job_id = 'my_load_job_'.format(hashlib.md5(sourceUri+_projectId+_datasetId+tableId).hexdigest())
var options = new CreateLoadJobOptions
        
            WriteDisposition = WriteDisposition.WriteAppend,
            CreateDisposition = CreateDisposition.CreateNever,
            SkipLeadingRows = 1,
            JobId = job_id, #add this
            SourceFormat = FileFormat.Csv,
            TimePartitioning = new TimePartitioning
            
                Type = _partitionByDayType,
                Field = timePartitionField
            
        ;

在这种情况下,如果您尝试重新插入相同的 job_id,则会出错。 如果池化失败,您还可以轻松生成此 job_id 以进行检查。

【讨论】:

当然 - 我很傻,因为我们确实会生成这些作业 ID... 谢谢@AlexeyMaloletkin,我会把它当作刺伤,并报告这对我来说是如何工作的。

以上是关于使用 Google.Cloud.BigQuery.V2 的 BigQuery 加载作业的幂等性的主要内容,如果未能解决你的问题,请参考以下文章

com.google.cloud.bigquery.BigQueryException:读取超时

BigQuery google.cloud.exceptions.ServiceUnavailable 503

使用 Python,将 google.cloud.bigquery.job.query.QueryJob 输出保存到本地 JSON 文件

如何使用 ruby​​ on rails 提供来自 google cloud bigquery 的分页结果

错误:模块“google.cloud.bigquery_storage”没有属性“BigQueryReadClient”

本地 Google App Engine:ImportError:没有名为 google.cloud.bigquery 的模块