Apache Beam TextIO glob 获取原始文件名

Posted

技术标签:

【中文标题】Apache Beam TextIO glob 获取原始文件名【英文标题】:Apache Beam TextIO glob get original filename 【发布时间】:2017-10-30 19:28:27 【问题描述】:

我已经设置了一个管道。我必须解析数百个 *.gz 文件。因此 glob 工作得很好。

但我需要当前处理文件的原始名称,因为我想将结果文件命名为原始文件。

有人可以帮我吗?

这是我的代码。

@Default.String(LOGS_PATH + "*.gz")
String getInputFile();
void setInputFile(String value);


    TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile());
        read.getName();

        p.apply("ReadLines", read).apply(new CountWords())
         .apply(MapElements.via(new FormatAsTextFn()))
         .apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput()));

    p.run().waitUntilFinish();

【问题讨论】:

【参考方案1】:

这可以从 Beam 2.2 开始使用 FileIO.match()FileIO.read() 和自定义代码的组合来读取文本行。您已经可以在 HEAD 使用它,或者您可以等到 2.2 版最终确定(目前正在进行中)。

PCollection<KV<String, String>> filesAndLines = 
  p.apply(FileIO.match().filepattern(...))
   .apply(FileIO.read())
   .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() 
     @ProcessElement
     public void process(ProcessContext c) 
       ReadableFile f = c.element();
       String filename = f.getMetadata().resourceId().toString();
       String line;
       try (BufferedReader r = new BufferedReader(Channels.newInputStream(f.open()))) 
         while ((line = r.readLine()) != null) 
           c.output(KV.of(filename, line));
         
       
     
   ));

【讨论】:

您是否会知道现在是否有一种简单的方法可以解决我的 python 问题? ***.com/questions/53404579/… 使用您的解决方案,最终输出是一个键值对。为了将实际行(值)写入适当的文件(键),我需要一个自定义接收器吗?或者有没有更好的方法来指定需要写什么?

以上是关于Apache Beam TextIO glob 获取原始文件名的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Beam 的 Dataflow 批量加载的性能问题

使用 Apache Beam 的 Dataflow 批量加载的性能问题

Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnecti

在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”

如何运行 Apache Beam 集成测试?

Python 上的 Apache Beam 将 beam.Map 调用相乘