在数据流中完成 BQ 写入后的 Apache Beam 写入状态信息

Posted

技术标签:

【中文标题】在数据流中完成 BQ 写入后的 Apache Beam 写入状态信息【英文标题】:Apache Beam writing status information after BQ writes are done within the dataflow 【发布时间】:2021-06-22 15:54:22 【问题描述】:

我正在努力为 BQ 写入完成后的写入状态找到一个好的解决方案。

每个数据流必须处理一个文件,并且在没有错误发生后,应该将状态写入Firestore。

我的代码如下所示:

PCollection<TableRow> failedInserts = results.getFailedInserts();

    failedInserts
    .apply("Set Global Window",
        Window.<TableRow>into(new GlobalWindows()))
    .apply("Count failures", Count.globally()).apply(ParDo.of(new DoFn<Long, ReportStatusInfo>() 


      @ProcessElement
      public void processElement(final ProcessContext c) throws IOException 
        Long errorNumbers = c.element();
        if (errorNumbers > 1) 
          //set status to failed
         else if (numberOfErrors == 0) 
        //set status to ok
        
        insert();
      
    ))

它似乎无法正常工作,因为我的印象是它不会等待整个 BQ 写入过程完成。

关于如何解决我在数据流中的问题或上述方法不起作用的任何其他想法?

【问题讨论】:

您观察到什么具体行为?这通常看起来像是观察失败插入的正确方法。 数据流失败,我的状态是“成功”。似乎使用 Streaming Inserts 方法可以更正常地工作 - 在测试此更改时没有观察到错误的结果,我确实在写入 BigQuery 仍在进行时强制数据流停止,结果符合预期。 【参考方案1】:

只有在使用流式插入时才支持 getFailedInserts 方法,而不是文件加载。在那种模式下,你的代码会做你想做的事

【讨论】:

以上是关于在数据流中完成 BQ 写入后的 Apache Beam 写入状态信息的主要内容,如果未能解决你的问题,请参考以下文章

数据流 GCS 到 BQ 问题

从数据流管道写入 BQ 时的动态表名

使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误

BQ shell 使用 write_disposition 作为写入附加加载数据存储时出错

在datalab上安装be_helper

Apache Beam 处理文件