当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

Posted

技术标签:

【中文标题】当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub【英文标题】:NACK not send back to Google Cloud Pub/Sub from Dataflow when there is an error in ParDo function 【发布时间】:2021-07-16 06:40:45 【问题描述】:

当 Dataflow 作业无法或​​不愿处理消息时,如何向 Pub/Sub 发送 NACK。

Pipeline pipeline = Pipeline.create(options);

    pipeline.apply("gcs2ZipExtractor-processor",
            PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(pubSubSubscription))
           .apply(ParDo.of(new ProcessZipFileEventDoFn(appProps)));
    logger.info("Started ZipFile Extractor");
    pipeline.run().waitUntilFinish();

上面是我用来运行 ApacheBeam Dataflow 管道作业的代码 sn-p。如果 ProcessZipFileEventDoFn 发生任何故障,我想向 Pub/Sub 订阅发送 NACK 消息,以便将消息移动到 DeadletterTopic。目前,Dataflow Runner 没有发生 NACK。

【问题讨论】:

【参考方案1】:

目前,Apache Beam SDK 不支持 Pub/Sub 的原生死信队列功能。但是,您可以相当轻松地编写自己的代码。以下内容来自此blog post 改编为您的代码。诀窍是使用来自单个 ParDo 的多个输出。一个输出 PCollection 将具有不引发任何异常的“好”数据。如果有任何异常,另一个输出 PCollection 将包含所有“坏”数据。然后,您可以将死信 PCollection 中的所有元素写入接收器,在您的情况下为 Pub/Sub 主题。

PCollection input =
    pipeline.apply("gcs2ZipExtractor-processor",
                   PubsubIO.readMessagesWithAttributes()
                       .fromSubscription(pubSubSubscription))

// Put this try-catch logic in your ProcessZipFileEventDoFn, and don't forget
// the "withOutputTags"!
final TupleTag successTag ;
final TupleTag deadLetterTag;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn()   
  @Override  
  void processElement(ProcessContext c)     
  try       
    c.output(process(c.element());    
   catch (Exception e)       
    c.sideOutput(deadLetterTag, c.element());  
  
).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead letter inputs to Pub/Sub for later analysis
outputTuple.get(deadLetterTag).apply(PubSubIO.write(...));

// Retrieve the successful elements...
PCollection success = outputTuple.get(successTag);

【讨论】:

以上是关于当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub的主要内容,如果未能解决你的问题,请参考以下文章

Cloud Pub/Sub 在回调中缺少 ack/nack 不会导致重新交付

侧输入的高效 ParDo 设置或 start_bundle

关于ORACLE中SELECT INTO 语句有使用集合函数时不会出现no data found 异常

RabbitMQ 上的 Nack 和拒绝

rabbitmq消费端的nack和重回队列的总结

插入 CASE 函数时 SQL 代码不会运行?