如何从光束中的 PCollection<string> 获取所有文件元数据
Posted
技术标签:
【中文标题】如何从光束中的 PCollection<string> 获取所有文件元数据【英文标题】:How to get all file metadata from PCollection<string> in beam 【发布时间】:2020-07-10 18:56:12 【问题描述】:我有一个包含文件路径的扁平化 PCollection
PCollection<String> "/this/is/a/123/*.csv , /this/is/a/124/*.csv"
flattenPCollection = pcs.apply(Flatten.<String>pCollections());
我想读取每个文件并获取文件名和进程
flattenPCollection
.apply("Read CSV files", FileIO.matchAll())
.apply("Read matching files",FileIO.readMatches())
.apply("Process each file", ParDo.of(new DoFn<FileIO.ReadableFile, String>()
@ProcessElement
public void process(@Element FileIO.ReadableFile file)
// We shloud be able to file and its metadata.
logger.info("File Metadata resourceId is ", file.getMetadata().resourceId());
// here we read each line and process
));
出现以下错误
Caused by: java.io.FileNotFoundException: No files matched spec: bob,22,new york
管道似乎正在读取 csv 文件的第一行并在文件系统中查找该字符串。
发生这种情况的原因是什么?
我想获取每个文件为 FileIO.ReadableFile
我确信我缺少一些非常简单的东西。任何帮助表示赞赏
更新
如果你有一个路径和文件的 PCollection,你需要手动遍历每个路径和文件并添加 ParDo
for(String path : pathList)
pipeline.apply(FileIO.match().filepattern(path))
.apply(FileIO.readMatches())
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>()
@ProcessElement
public void process(@Element FileIO.ReadableFile file) throws IOException
logger.info("Metadata - " + file.getMetadata());
logger.info("File Contents - " + file.readFullyAsUTF8String());
logger.info("File Metadata resourceId is " + file.getMetadata().resourceId());
));
感谢@bigbounty
【问题讨论】:
***.com/questions/53404579/… 【参考方案1】:Pipeline pipeline = Pipeline.create();
pipeline.apply(FileIO.match().filepattern("/Users/bigbounty/Documents/beam/files/*.txt"))
.apply(FileIO.readMatches())
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>()
@ProcessElement
public void process(@Element FileIO.ReadableFile file) throws IOException
LOG.info("Metadata - " + file.getMetadata());
LOG.info("File Contents - " + file.readFullyAsUTF8String());
LOG.info("File Metadata resourceId is " + file.getMetadata().resourceId());
));
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
输出:
Metadata - MetadataresourceId=/Users/bigbounty/Document/beam/files/3.txt, sizeBytes=7, isReadSeekEfficient=true, lastModifiedMillis=0
Metadata - MetadataresourceId=/Users/bigbounty/Document/beam/files/1.txt, sizeBytes=7, isReadSeekEfficient=true, lastModifiedMillis=0
Metadata - MetadataresourceId=/Users/bigbounty/Document/beam/files/2.txt, sizeBytes=7, isReadSeekEfficient=true, lastModifiedMillis=0
File Contents - hello-1
File Metadata resourceId is /Users/bigbounty/Document/beam/files/1.txt
File Contents - hello-2
File Metadata resourceId is /Users/bigbounty/Document/beam/files/2.txt
File Contents - hello-3
File Metadata resourceId is /Users/bigbounty/Document/beam/files/3.txt
【讨论】:
以上是关于如何从光束中的 PCollection<string> 获取所有文件元数据的主要内容,如果未能解决你的问题,请参考以下文章
如何从谷歌数据流管道中的多个输入 PCollection 生成一个输出 PCollection?
如何从 Dataflow 中的 PCollection 读取 bigQuery
如何将 Pcollection<String> 变量转换为字符串