当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub
Posted
技术标签:
【中文标题】当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub【英文标题】:NACK not send back to Google Cloud Pub/Sub from Dataflow when there is an error in ParDo function 【发布时间】:2021-07-16 06:40:45 【问题描述】:当 Dataflow 作业无法或不愿处理消息时,如何向 Pub/Sub 发送 NACK。
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("gcs2ZipExtractor-processor",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(pubSubSubscription))
.apply(ParDo.of(new ProcessZipFileEventDoFn(appProps)));
logger.info("Started ZipFile Extractor");
pipeline.run().waitUntilFinish();
上面是我用来运行 ApacheBeam Dataflow 管道作业的代码 sn-p。如果 ProcessZipFileEventDoFn 发生任何故障,我想向 Pub/Sub 订阅发送 NACK 消息,以便将消息移动到 DeadletterTopic。目前,Dataflow Runner 没有发生 NACK。
【问题讨论】:
【参考方案1】:目前,Apache Beam SDK 不支持 Pub/Sub 的原生死信队列功能。但是,您可以相当轻松地编写自己的代码。以下内容来自此blog post 改编为您的代码。诀窍是使用来自单个 ParDo 的多个输出。一个输出 PCollection 将具有不引发任何异常的“好”数据。如果有任何异常,另一个输出 PCollection 将包含所有“坏”数据。然后,您可以将死信 PCollection 中的所有元素写入接收器,在您的情况下为 Pub/Sub 主题。
PCollection input =
pipeline.apply("gcs2ZipExtractor-processor",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(pubSubSubscription))
// Put this try-catch logic in your ProcessZipFileEventDoFn, and don't forget
// the "withOutputTags"!
final TupleTag successTag ;
final TupleTag deadLetterTag;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn()
@Override
void processElement(ProcessContext c)
try
c.output(process(c.element());
catch (Exception e)
c.sideOutput(deadLetterTag, c.element());
).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead letter inputs to Pub/Sub for later analysis
outputTuple.get(deadLetterTag).apply(PubSubIO.write(...));
// Retrieve the successful elements...
PCollection success = outputTuple.get(successTag);
【讨论】:
以上是关于当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub的主要内容,如果未能解决你的问题,请参考以下文章
Cloud Pub/Sub 在回调中缺少 ack/nack 不会导致重新交付