Apache Beam TextIO.ReadAll(),处理丢失的文件名?

Posted

技术标签:

【中文标题】Apache Beam TextIO.ReadAll(),处理丢失的文件名?【英文标题】:Apache Beam TextIO.ReadAll(), handle missing filenames? 【发布时间】:2018-10-01 15:58:09 【问题描述】:

我有一个从 GCS 存储桶中的特定文件读取数据的管道步骤块。代码如下所示:

List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");

return pipeline
    .apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
    .apply("GatherFileData", TextIO.readAll())        
    .apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
    .apply("Group", GroupByKey.<String, String>create())
    .apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));

如果 GCS 中缺少其中一个文件路径(例如,缺少“filepathMissing”),则整个管道会崩溃。我正在考虑围绕这组管道步骤添加一个 try/catch,但我不确定这样做的后果。

我的问题:

    在使用TextIO.readAll() 时,try/catch 是否是对可能丢失的文件提供错误处理的正确方法? 使用 try/catch,如果无法从 GCS 中找到一个文件,上述整套步骤会失败吗?

如果有任何关于如何执行此操作的特定文档,请将其与您的答案链接:)

【问题讨论】:

【参考方案1】:

我最终找到了上面问题的答案。

我不得不在TextIO.readAll()之后添加代码.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW),如下图。

List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");

return pipeline
    .apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
    .apply("GatherFileData", TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))        
    .apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
    .apply("Group", GroupByKey.<String, String>create())
    .apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));

此修复的不幸问题是,如果缺少文件,您的管道将在管道步骤输出一条 INFO 消息,但不会在管道之外引发错误。如果您希望该文件在那里,您可能不知道该文件未被包含,除非您明确检查数据流中的管道步骤日志。

【讨论】:

以上是关于Apache Beam TextIO.ReadAll(),处理丢失的文件名?的主要内容,如果未能解决你的问题,请参考以下文章

如何运行 Apache Beam 集成测试?

Python 上的 Apache Beam 将 beam.Map 调用相乘

Apache Beam - 跳过管道步骤

什么是 Apache Beam? [关闭]

apache beam ElasticSearchIO 遇到异常后job中断执行 自己定制beam IO

数据流管道上的 Apache Beam StatusRuntimeException