在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名
Posted
技术标签:
【中文标题】在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名【英文标题】:How to Get Filename when using file pattern match in google-cloud-dataflow 【发布时间】:2015-05-01 08:13:45 【问题描述】:有人知道吗?
我是使用数据流的新手。使用文件模式匹配时如何获取文件名,以这种方式。
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))
我想知道我如何检测 kinglear.txt、Hamlet.txt 等文件名。
【问题讨论】:
【参考方案1】:如果您想简单地扩展文件模式并获得与其匹配的文件名列表,您可以使用GcsIoChannelFactory.match("gs://dataflow-samples/shakespeare/*.txt")
(请参阅GcsIoChannelFactory)。
如果您想从管道中的 DoFn 下游之一访问“当前文件名” - 目前不支持(尽管有一些解决方法 - 见下文)。这是一个常见的功能请求,我们仍在考虑如何以自然、通用和高性能的方式将其最好地融入框架。
一些解决方法包括:
像这样编写管道(tf-idf 示例使用这种方法): DoFn readFile = ...(获取文件名,读取文件并生成记录)... p.apply(创建.of(文件名)) .apply(ParDo.of(readFile)) .apply(管道的其余部分)这有一个缺点,即动态工作再平衡功能不会特别好,因为它们目前仅适用于 Read PTransform 的级别,但不适用于具有高扇出的 ParDo 级别(就像这里的那个,它将读取文件并生成所有记录);并且并行化仅适用于文件级别,但不会将文件拆分为子范围。在阅读莎士比亚的规模上,这不是问题,但如果您正在阅读一组大小截然不同的文件,有些文件非常大,那么它可能会成为问题。
实现您自己的FileBasedSource
(javadoc, general documentation),它将返回类似Pair<String, T>
类型的记录,其中String
是文件名,T
是您正在阅读的记录。在这种情况下,框架会为您处理文件模式匹配,动态工作再平衡可以正常工作,但是您可以在 FileBasedReader
中编写读取逻辑。
这两种变通方法都不理想,但根据您的要求,其中一种可能会为您解决问题。
【讨论】:
@jkff:这是否意味着我们也失去了TextIO基于文件扩展名的自动压缩检测? @hraban 不幸的是,是的 - 在这两种情况下。我们正在考虑对源和接收器的处理方式进行一些根本性的改进,这将解决这类问题以及更多问题,但目前还没有时间表。 @jkff 这个功能已经在 Java 和/或 Python 中实现了吗? 尚未实施,但将作为issues.apache.org/jira/browse/BEAM-65 的一部分实施,目前正在积极开发中。 @jkff 你知道我会为这个问题做些什么吗,有点类似但不确定新版本是否会改变***.com/questions/53404579/…【参考方案2】:基于最新 SDK 更新 Java(SDK 2.9.0):
Beams TextIO 阅读器无法访问文件名本身,对于这些用例,我们需要使用 FileIO 来匹配文件并访问存储在文件名中的信息。与 TextIO 不同,文件的读取需要用户在 FileIO 读取下游的转换中处理。 FileIO 读取的结果是 PCollection,ReadableFile 类包含文件名作为元数据,可以与文件内容一起使用。
FileIO 确实有一个方便的方法 readFullyAsUTF8String() ,它将整个文件读入 String 对象,这将首先将整个文件读入内存。如果内存是一个问题,您可以使用 FileSystems 等实用程序类直接处理文件。
发件人:Document Link
PCollection<KV<String, String>> filesAndContents = p
.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(MapElements
// uses imports from TypeDescriptors
.into(KVs(strings(), strings()))
.via((ReadableFile f) -> KV.of(
f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));
Python (sdk 2.9.0):
对于 Python 的 2.9.0,您需要从 Dataflow 管道外部收集 URI 列表,并将其作为参数提供给管道。例如,使用 FileSystems 通过 Glob 模式读取文件列表,然后将其传递给 PCollection 进行处理。
一旦 fileio see PR https://github.com/apache/beam/pull/7791/ 可用,以下代码也将是 python 的一个选项。
import apache_beam as beam
from apache_beam.io import fileio
with beam.Pipeline() as p:
readable_files = (p
| fileio.MatchFiles(‘hdfs://path/to/*.txt’)
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contents = (readable_files
| beam.Map(lambda x: (x.metadata.path,
x.read_utf8()))
【讨论】:
【参考方案3】:一种方法是构建一个List<PCollection>
,其中每个条目对应一个输入文件,然后使用Flatten
。例如,如果您想将文件集合的每一行解析为 Foo
对象,您可以执行以下操作:
public static class FooParserFn extends DoFn<String, Foo>
private String fileName;
public FooParserFn(String fileName)
this.fileName = fileName;
@Override
public void processElement(ProcessContext processContext) throws Exception
String line = processContext.element();
// here you have access to both the line of text and the name of the file
// from which it came.
public static void main(String[] args)
...
List<String> inputFiles = ...;
List<PCollection<Foo>> foosByFile =
Lists.transform(inputFiles,
new Function<String, PCollection<Foo>>()
@Override
public PCollection<Foo> apply(String fileName)
return p.apply(TextIO.Read.from(fileName))
.apply(new ParDo().of(new FooParserFn(fileName)));
);
PCollection<Foo> foos = PCollectionList.<Foo>empty(p).and(foosByFile).apply(Flatten.<Foo>pCollections());
...
这种方法的一个缺点是,如果您有 100 个输入文件,那么 Cloud Dataflow 监控控制台中也会有 100 个节点。这使得很难判断发生了什么。我很想听听 Google Cloud Dataflow 人员的意见,这种方法是否有效。
【讨论】:
你知道我会为这个问题做些什么吗,有点类似,但不确定新版本是否会改变:***.com/questions/53404579/…【参考方案4】:当使用类似于@danvk 的代码时,我在数据流图上也有 100 个输入文件 = 100 个节点。我切换到这样的方法,这导致所有读取被合并到一个块中,您可以展开该块以深入到读取的每个文件/目录。使用这种方法而不是我们用例中的 Lists.transform 方法,该作业也运行得更快。
GcsOptions gcsOptions = options.as(GcsOptions.class);
List<GcsPath> paths = gcsOptions.getGcsUtil().expand(GcsPath.fromUri(options.getInputFile()));
List<String>filesToProcess = paths.stream().map(item -> item.toString()).collect(Collectors.toList());
PCollectionList<SomeClass> pcl = PCollectionList.empty(p);
for(String fileName : filesToProcess)
pcl = pcl.and(
p.apply("ReadAvroFile" + fileName, AvroIO.Read.named("ReadFromAvro")
.from(fileName)
.withSchema(SomeClass.class)
)
.apply(ParDo.of(new MyDoFn(fileName)))
);
// flatten the PCollectionList, combining all the PCollections together
PCollection<SomeClass> flattenedPCollection = pcl.apply(Flatten.pCollections());
【讨论】:
【参考方案5】:对于上述问题,这可能是一个很晚的帖子,但我想用 Beam 捆绑类添加答案。
这也可以看作是从@Reza Rokni提供的解决方案中提取的代码。
PCollection<String> listOfFilenames =
pipe.apply(FileIO.match().filepattern("gs://apache-beam-samples/shakespeare/*"))
.apply(FileIO.readMatches())
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(FileIO.ReadableFile file) ->
String f = file.getMetadata().resourceId().getFilename();
System.out.println(f);
return f;
));
pipe.run().waitUntilFinish();
PCollection<String>
上方将有一个在任何提供的目录中可用的文件列表。
【讨论】:
以上是关于在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名的主要内容,如果未能解决你的问题,请参考以下文章
在 Maven 中,如何在版本中命名战争文件与在快照构建中不同