为啥 Apache Beam BigQueryIO 每次运行都使用相同的 JobId?

Posted

技术标签:

【中文标题】为啥 Apache Beam BigQueryIO 每次运行都使用相同的 JobId?【英文标题】:Why Does Apache Beam BigQueryIO Use Same JobId per Run?为什么 Apache Beam BigQueryIO 每次运行都使用相同的 JobId? 【发布时间】:2020-11-30 16:44:20 【问题描述】:

我正在运行一个批处理 Dataflow 作业,该作业从 BigQuery 表中读取所有行,将它们转换为 JSON 字符串,然后将字符串写入 PubSub 主题。此模板将使用相同或不同的参数重复使用,并且应始终在 BigQuery 中发布找到的行,无论它是否与上一个作业的行相同。

我遇到的问题是,每次作业运行后,我都必须再次上传模板,否则下一个作业将成功,而无需从 BigQuery 读取任何内容,即使使用与第一个成功作业相同的参数也是如此。尝试创建 BigQueryIO 作业时,工作人员登录 Google Cloud Console 以获取第二个作业报告以下内容:

Request failed with code 409, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs.

根据Google Cloud documentation,409 错误是冲突重复。用于两个 BigQueryIO 作业的作业 ID 完全相同,因此重复的作业 ID 会导致第二个 DataFlow 作业失败。

我已经尝试使用相同的模板使用不同的参数和表,但作业 ID 永远不会改变,直到我通过下面的 Maven 命令再次上传模板,每个 Google Cloud Documentation on Custom Templates

mvn compile exec:java \
     -Dexec.mainClass=org.apache.beam.examples.BigQuery \
     -Dexec.args="--runner=DataflowRunner \
                  --project=my-project \
                  --stagingLocation=gs://bkt_test/my_test/staging \
                  --region=us-central1 \
                  --templateLocation=gs://bkt_test/my_test/templates/BigQueryToPubSub"

为什么每次运行的 BigQueryIO 作业 ID 不是唯一的?这是一个错误还是我可以指定作业 ID 的格式以确保唯一性?

代码参考:

public static void main(String[] args) 
        PipelineOptionsFactory.register(BigQueryToPubSubOptions.class);
        BigQueryToPubSubOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToPubSubOptions.class);
        Pipeline pipe = Pipeline.create(options);

        pipe
                .apply(
                        "Read BigQuery Rows",
                        BigQueryIO
                                .readTableRows()
                                .from(options.getInputTable())
                                .withoutValidation()
                )
                .apply(
                        "Convert Row to String",
                        MapElements.via(new SimpleFunction<TableRow, String>() 
                            @Override
                            public String apply(TableRow input) 
                                ObjectMapper mapper = new ObjectMapper();
                                ObjectNode json = mapper.createObjectNode();
                                input.forEach((key, value) -> 
                                    LOG.info("Column: " + key + "\tValue: " + value);
                                    json.put(key, (String) value);
                                );
                                try 
                                    return mapper.writeValueAsString(json);
                                 catch (JsonProcessingException e) 
                                    LOG.error("Failed to convert row to json", e);
                                    return null;
                                
                            
                        )
                )
                .apply(
                        "Publish to Topic",
                        PubsubIO
                                .writeStrings()
                                .to(options.getOutputTopic())
                )
                .getPipeline()
                .run();
    

public interface BigQueryToPubSubOptions extends PipelineOptions 
        @Description("The BigQuery input table to read from")
        ValueProvider<String> getInputTable();

        void setInputTable(ValueProvider<String> intputTable);

        @Description("The output PubSub topic to write the rows to")
        ValueProvider<String> getOutputTopic();

        void setOutputTopic(ValueProvider<String> outputTopic);
    

【问题讨论】:

【参考方案1】:

此问题已在此处得到解答:https://***.com/a/62665445/14736159

将 .withTemplateCompatibility() 添加到 BigQueryIO.read 和 BigQueryIO.readTableRows 可解决 BigQuery 作业 ID 重复问题。

我的代码调整如下所示

pipe
    .apply(
        "Read BigQuery Rows",
        BigQueryIO
                .readTableRows()
                .from(options.getInputTable())
                .withTemplateCompatibility()
                .withoutValidation()
    )

【讨论】:

以上是关于为啥 Apache Beam BigQueryIO 每次运行都使用相同的 JobId?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?

Apache Beam 的 BigQueryIO (Java):无法将 TIMESTAMP 字段写入 BigQuery——fastxml.jackson 异常“类型不支持”

Apache-beam Bigquery .fromQuery ClassCastException

Bigtable IO 连接器是不是有 Apache Beam DynamicDestinations?

Beam direct-runner 慢速 BigQuery 读取

Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnecti