在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名

Posted

技术标签:

【中文标题】在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名【英文标题】:How to Get Filename when using file pattern match in google-cloud-dataflow 【发布时间】:2015-05-01 08:13:45 【问题描述】:

有人知道吗?

我是使用数据流的新手。使用文件模式匹配时如何获取文件名,以这种方式。

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))

我想知道我如何检测 kinglear.txt、Hamlet.txt 等文件名。

【问题讨论】:

【参考方案1】:

如果您想简单地扩展文件模式并获得与其匹配的文件名列表,您可以使用GcsIoChannelFactory.match("gs://dataflow-samples/shakespeare/*.txt")(请参阅GcsIoChannelFactory)。

如果您想从管道中的 DoFn 下游之一访问“当前文件名” - 目前不支持(尽管有一些解决方法 - 见下文)。这是一个常见的功能请求,我们仍在考虑如何以自然、通用和高性能的方式将其最好地融入框架。

一些解决方法包括:

像这样编写管道(tf-idf 示例使用这种方法): DoFn readFile = ...(获取文件名,读取文件并生成记录)... p.apply(创建.of(文件名)) .apply(ParDo.of(readFile)) .apply(管道的其余部分)

这有一个缺点,即动态工作再平衡功能不会特别好,因为它们目前仅适用于 Read PTransform 的级别,但不适用于具有高扇出的 ParDo 级别(就像这里的那个,它将读取文件并生成所有记录);并且并行化仅适用于文件级别,但不会将文件拆分为子范围。在阅读莎士比亚的规模上,这不是问题,但如果您正在阅读一组大小截然不同的文件,有些文件非常大,那么它可能会成为问题。

实现您自己的FileBasedSource (javadoc, general documentation),它将返回类似Pair<String, T> 类型的记录,其中String 是文件名,T 是您正在阅读的记录。在这种情况下,框架会为您处理文件模式匹配,动态工作再平衡可以正常工作,但是您可以在 FileBasedReader 中编写读取逻辑。

这两种变通方法都不理想,但根据您的要求,其中一种可能会为您解决问题。

【讨论】:

@jkff:这是否意味着我们也失去了TextIO基于文件扩展名的自动压缩检测? @hraban 不幸的是,是的 - 在这两种情况下。我们正在考虑对源和接收器的处理方式进行一些根本性的改进,这将解决这类问题以及更多问题,但目前还没有时间表。 @jkff 这个功能已经在 J​​ava 和/或 Python 中实现了吗? 尚未实施,但将作为issues.apache.org/jira/browse/BEAM-65 的一部分实施,目前正在积极开发中。 @jkff 你知道我会为这个问题做些什么吗,有点类似但不确定新版本是否会改变***.com/questions/53404579/…【参考方案2】:

基于最新 SDK 更新 Java(SDK 2.9.0):

Beams TextIO 阅读器无法访问文件名本身,对于这些用例,我们需要使用 FileIO 来匹配文件并访问存储在文件名中的信息。与 TextIO 不同,文件的读取需要用户在 FileIO 读取下游的转换中处理。 FileIO 读取的结果是 PCollection,ReadableFile 类包含文件名作为元数据,可以与文件内容一起使用。

FileIO 确实有一个方便的方法 readFullyAsUTF8String() ,它将整个文件读入 String 对象,这将首先将整个文件读入内存。如果内存是一个问题,您可以使用 FileSystems 等实用程序类直接处理文件。

发件人:Document Link

PCollection<KV<String, String>> filesAndContents = p
     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
     // withCompression can be omitted - by default compression is detected from the filename.
     .apply(FileIO.readMatches().withCompression(GZIP))
     .apply(MapElements
         // uses imports from TypeDescriptors
         .into(KVs(strings(), strings()))
         .via((ReadableFile f) -> KV.of(
             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

Python (sdk 2.9.0):

对于 Python 的 2.9.0,您需要从 Dataflow 管道外部收集 URI 列表,并将其作为参数提供给管道。例如,使用 FileSystems 通过 Glob 模式读取文件列表,然后将其传递给 PCollection 进行处理。

一旦 fileio see PR https://github.com/apache/beam/pull/7791/ 可用,以下代码也将是 python 的一个选项。

import apache_beam as beam
from apache_beam.io import fileio

with beam.Pipeline() as p:
  readable_files = (p 
                    | fileio.MatchFiles(‘hdfs://path/to/*.txt’)
                    | fileio.ReadMatches()
                    | beam.Reshuffle())
  files_and_contents = (readable_files 
                        | beam.Map(lambda x: (x.metadata.path, 
                                              x.read_utf8()))

【讨论】:

【参考方案3】:

一种方法是构建一个List&lt;PCollection&gt;,其中每个条目对应一个输入文件,然后使用Flatten。例如,如果您想将文件集合的每一行解析为 Foo 对象,您可以执行以下操作:

public static class FooParserFn extends DoFn<String, Foo> 
  private String fileName;
  public FooParserFn(String fileName) 
    this.fileName = fileName;
  

  @Override
  public void processElement(ProcessContext processContext) throws Exception 
    String line = processContext.element();
    // here you have access to both the line of text and the name of the file
    // from which it came.
  


public static void main(String[] args) 
  ...
  List<String> inputFiles = ...;
  List<PCollection<Foo>> foosByFile =
          Lists.transform(inputFiles,
          new Function<String, PCollection<Foo>>() 
            @Override
            public PCollection<Foo> apply(String fileName) 
              return p.apply(TextIO.Read.from(fileName))
                      .apply(new ParDo().of(new FooParserFn(fileName)));
            
          );

  PCollection<Foo> foos = PCollectionList.<Foo>empty(p).and(foosByFile).apply(Flatten.<Foo>pCollections());
  ...

这种方法的一个缺点是,如果您有 100 个输入文件,那么 Cloud Dataflow 监控控制台中也会有 100 个节点。这使得很难判断发生了什么。我很想听听 Google Cloud Dataflow 人员的意见,这种方法是否有效。

【讨论】:

你知道我会为这个问题做些什么吗,有点类似,但不确定新版本是否会改变:***.com/questions/53404579/…【参考方案4】:

当使用类似于@danvk 的代码时,我在数据流图上也有 100 个输入文件 = 100 个节点。我切换到这样的方法,这导致所有读取被合并到一个块中,您可以展开该块以深入到读取的每个文件/目录。使用这种方法而不是我们用例中的 Lists.transform 方法,该作业也运行得更快。

GcsOptions gcsOptions = options.as(GcsOptions.class);
List<GcsPath> paths = gcsOptions.getGcsUtil().expand(GcsPath.fromUri(options.getInputFile()));
List<String>filesToProcess = paths.stream().map(item -> item.toString()).collect(Collectors.toList());

PCollectionList<SomeClass> pcl = PCollectionList.empty(p);
for(String fileName : filesToProcess) 
    pcl = pcl.and(
            p.apply("ReadAvroFile" + fileName, AvroIO.Read.named("ReadFromAvro")
                    .from(fileName)
                    .withSchema(SomeClass.class)
            )
            .apply(ParDo.of(new MyDoFn(fileName)))
    );


// flatten the PCollectionList, combining all the PCollections together
PCollection<SomeClass> flattenedPCollection = pcl.apply(Flatten.pCollections());

【讨论】:

【参考方案5】:

对于上述问题,这可能是一个很晚的帖子,但我想用 Beam 捆绑类添加答案。

这也可以看作是从@Reza Rokni提供的解决方案中提取的代码。

PCollection<String> listOfFilenames =
    pipe.apply(FileIO.match().filepattern("gs://apache-beam-samples/shakespeare/*"))
        .apply(FileIO.readMatches())
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (FileIO.ReadableFile file) -> 
                      String f = file.getMetadata().resourceId().getFilename();
                      System.out.println(f);
                      return f;
                    ));

pipe.run().waitUntilFinish();

PCollection&lt;String&gt; 上方将有一个在任何提供的目录中可用的文件列表。

【讨论】:

以上是关于在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名的主要内容,如果未能解决你的问题,请参考以下文章

在 Maven 中,如何在版本中命名战争文件与在快照构建中不同

存储在 plist 中的数据在模拟器中有效,但在设备中无效

在 C 中声明 == 在 C++ 中定义? [复制]

初始化发生在哪里,在 init 方法中还是在实例声明中?

查询在数据库中花费了更多时间,尽管在连接条件中使用了索引列,那么我们可以在代码中做些啥来优化

在 Athena 中,如何在结构中的数组中查询结构的成员?