为啥 Apache Beam BigQueryIO 每次运行都使用相同的 JobId?
Posted
技术标签:
【中文标题】为啥 Apache Beam BigQueryIO 每次运行都使用相同的 JobId?【英文标题】:Why Does Apache Beam BigQueryIO Use Same JobId per Run?为什么 Apache Beam BigQueryIO 每次运行都使用相同的 JobId? 【发布时间】:2020-11-30 16:44:20 【问题描述】:我正在运行一个批处理 Dataflow 作业,该作业从 BigQuery 表中读取所有行,将它们转换为 JSON 字符串,然后将字符串写入 PubSub 主题。此模板将使用相同或不同的参数重复使用,并且应始终在 BigQuery 中发布找到的行,无论它是否与上一个作业的行相同。
我遇到的问题是,每次作业运行后,我都必须再次上传模板,否则下一个作业将成功,而无需从 BigQuery 读取任何内容,即使使用与第一个成功作业相同的参数也是如此。尝试创建 BigQueryIO 作业时,工作人员登录 Google Cloud Console 以获取第二个作业报告以下内容:
Request failed with code 409, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs.
根据Google Cloud documentation,409 错误是冲突或重复。用于两个 BigQueryIO 作业的作业 ID 完全相同,因此重复的作业 ID 会导致第二个 DataFlow 作业失败。
我已经尝试使用相同的模板使用不同的参数和表,但作业 ID 永远不会改变,直到我通过下面的 Maven 命令再次上传模板,每个 Google Cloud Documentation on Custom Templates
mvn compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.BigQuery \
-Dexec.args="--runner=DataflowRunner \
--project=my-project \
--stagingLocation=gs://bkt_test/my_test/staging \
--region=us-central1 \
--templateLocation=gs://bkt_test/my_test/templates/BigQueryToPubSub"
为什么每次运行的 BigQueryIO 作业 ID 不是唯一的?这是一个错误还是我可以指定作业 ID 的格式以确保唯一性?
代码参考:
public static void main(String[] args)
PipelineOptionsFactory.register(BigQueryToPubSubOptions.class);
BigQueryToPubSubOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToPubSubOptions.class);
Pipeline pipe = Pipeline.create(options);
pipe
.apply(
"Read BigQuery Rows",
BigQueryIO
.readTableRows()
.from(options.getInputTable())
.withoutValidation()
)
.apply(
"Convert Row to String",
MapElements.via(new SimpleFunction<TableRow, String>()
@Override
public String apply(TableRow input)
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();
input.forEach((key, value) ->
LOG.info("Column: " + key + "\tValue: " + value);
json.put(key, (String) value);
);
try
return mapper.writeValueAsString(json);
catch (JsonProcessingException e)
LOG.error("Failed to convert row to json", e);
return null;
)
)
.apply(
"Publish to Topic",
PubsubIO
.writeStrings()
.to(options.getOutputTopic())
)
.getPipeline()
.run();
public interface BigQueryToPubSubOptions extends PipelineOptions
@Description("The BigQuery input table to read from")
ValueProvider<String> getInputTable();
void setInputTable(ValueProvider<String> intputTable);
@Description("The output PubSub topic to write the rows to")
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> outputTopic);
【问题讨论】:
【参考方案1】:此问题已在此处得到解答:https://***.com/a/62665445/14736159
将 .withTemplateCompatibility() 添加到 BigQueryIO.read 和 BigQueryIO.readTableRows 可解决 BigQuery 作业 ID 重复问题。
我的代码调整如下所示
pipe
.apply(
"Read BigQuery Rows",
BigQueryIO
.readTableRows()
.from(options.getInputTable())
.withTemplateCompatibility()
.withoutValidation()
)
【讨论】:
以上是关于为啥 Apache Beam BigQueryIO 每次运行都使用相同的 JobId?的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?
Apache Beam 的 BigQueryIO (Java):无法将 TIMESTAMP 字段写入 BigQuery——fastxml.jackson 异常“类型不支持”
Apache-beam Bigquery .fromQuery ClassCastException
Bigtable IO 连接器是不是有 Apache Beam DynamicDestinations?
Beam direct-runner 慢速 BigQuery 读取
Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnecti