在 Dataflow 管道中写入 BigQuery 表失败

Posted

技术标签:

【中文标题】在 Dataflow 管道中写入 BigQuery 表失败【英文标题】:Write to BigQuery table is failing in Dataflow pipeline 【发布时间】:2021-10-28 16:28:19 【问题描述】:

我正在开发一个 Dataflow 管道,它正在从谷歌云存储读取一个 protobuf 文件并对其进行解析并尝试写入 BigQuery 表。没有时它工作正常。行数约为 20k 但没有。行数约为 200k,然后失败。下面是示例代码:

Pipeline pipeline = Pipeline.create(options);

        PCollection<PBClass> dataCol = pipeline.apply(FileIO.match().filepattern(options.getInputFile()))
                .apply(FileIO.readMatches())
                .apply("Read GPB File", ParDo.of(new ParseGpbFn()));

dataCol.apply("Transform to Delta", ParDo.of(deltaSchema))
                .apply(Flatten.iterables())
                .apply(
                        BigQueryIO
                                //.write()
                                .writeTableRows()
                                .to(deltaSchema.tableSpec)
                                .withMethod(Method.STORAGE_WRITE_API)
                                .withSchema(schema)
                                //.withFormatFunction(irParDeltaSchema)
                                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                                .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
                                .withExtendedErrorInfo()
                )
        ;

尝试了以下方法的不同组合

withMethod
write
withFormatFunction

也不同。工人和不同的计算引擎类型。

每次卡在GroupByKey 阶段并给出以下错误:

Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
    org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)

工作步骤表视图: 和图表视图:

【问题讨论】:

【参考方案1】:
Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
            org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
            org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
            org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)

您收到的错误代码 - 如上所述 - 是因为在您的代码中的某处,当您指定要加载的 GCS 文件时,它的格式不正确,URI 应该看起来像这样 gs:/ /bucket/path/to/file.

【讨论】:

从 gcs 读取不是问题,该作业能够从 GCS 读取文件,因为它能够写入表中没有较少编号的 1。行但无法写入其他没有的表。行数非常高 你要写的行数是有限制的,你可以试着分部分写行吗?

以上是关于在 Dataflow 管道中写入 BigQuery 表失败的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud Dataflow - 是不是可以定义从 BigQuery 读取数据并写入本地数据库的管道?

流插入,然后定期合并到 Dataflow 管道中的 BigQuery [关闭]

在 Dataflow 流式传输管道上捕获 BigQuery HttpBadRequestError

数据流:我可以使用批处理作业连续写/流写入BigQuery吗?

Dataflow Bigquery-Bigquery 管道在较小的数据上执行,但不是在大型生产数据集上执行

Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象