如何在 BigQuery 中获取文件加载插入失败的插入记录

Posted

技术标签:

【中文标题】如何在 BigQuery 中获取文件加载插入失败的插入记录【英文标题】:How to get failed insert record for file load insertion in BigQuery 【发布时间】:2021-07-18 09:47:31 【问题描述】:

我正在使用 Apache Beam (Java SDK) 通过批量加载方法(文件加载)在 BigQuery 中插入记录。我想检索那些在插入过程中失败的记录。

是否可以对失败的记录制定重试策略?

下面是我的代码:

public static void insertToBigQueryDataLake(
        final PCollectionTuple dataStoresCollectionTuple,
        final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
        final Long loadJobTriggerFrequency,
        final Integer loadJobNumShard) 


    WriteResult writeResult = dataStoresCollectionTuple
            .get(dataLakeValidTag)
            .apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
            .apply(
                    WRITING_EVENTS_NAME,
                    BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
                            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                            .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
                            .withNumFileShards(loadJobNumShard)
                            .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
                            .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));

    writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() 
        @ProcessElement
        public void processElement(final ProcessContext processContext) throws IOException 
            System.out.println("Table Row : " + processContext.element().toPrettyString());
        
    ));


【问题讨论】:

【参考方案1】:

使用 getFailedInsertsWithErr() 方法,我们可以将失败的插入推送到另一个表以执行根本原因分析 (RCA),请查看here 了解更多详细信息。

Example:
// write failed rows with their error to error table                
writeResult
        .getFailedInsertsWithErr()
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))
        .apply("BQ-insert-error-extract", ParDo.of(new BigQueryInsertErrorExtractFn(tableRowToInsertView)).withSideInputs(tableRowToInsertView))
        .apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
                .to(errTableSpec)
                .withJsonSchema(errSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

【讨论】:

您好兄弟,我们能够获得插入错误和记录,但只有在重试 1000 次后才能获得错误,这需要 7 个多小时。有没有办法在这个中设置重试策略。我正在加载文件(批量加载)。

以上是关于如何在 BigQuery 中获取文件加载插入失败的插入记录的主要内容,如果未能解决你的问题,请参考以下文章

解析 json 文件以获取要插入 bigquery 的正确列

如何在 C# 中提高数据流插入 Bigquery 表的性能

BigQuery 加载作业失败 - 无法访问 GCS 文件

如何使用 BigQuery Streaming 获取插入的行数

将 Avro 文件加载到 BigQuery 失败并出现内部错误

BigQuery 无法插入作业。工作流失败