访问数据流管道内的文件

Posted

技术标签:

【中文标题】访问数据流管道内的文件【英文标题】:Access File inside the dataflow pipeline 【发布时间】:2019-05-29 02:02:55 【问题描述】:

我想在管道启动之前从临时位置下载某些文件。要在 ParDo 功能中读取的文件 .mmdb 文件。文件存储在 Google 存储上,但使用 .mmdb 文件的方法需要它们是一个 File(java.io) 对象。

如果我将它包含在 --filesToStage 中,它们可以作为 InputStream 拉链里面。我想以文件而不是 InputStream 的形式访问它们。 实现这一目标的最佳方法是什么?

我目前正在将文件下载到 ParDo 设置中工作人员的临时文件夹中。

【问题讨论】:

【参考方案1】:

这是一个非常广泛和高级别的问题。答案取决于您使用文件的逻辑。 File 代表file on a filesystem,因此如果您有一个组件要求输入是File 的实例,那么将其写入本地临时文件夹是正确的做法。 Beam 没有为这种情况提供更好的抽象。

但是,我建议您考虑更新当前处理 Files 的逻辑,以便也接受其他类型的输入。您可能会遇到由于缺乏关注点分离和紧密耦合而导致的问题。也就是说,您有一个组件,它接收File,打开它,在打开它时处理错误,读取它,从中解析数据,甚至可能验证和处理数据。所有这些都是单独的问题,可能应该由单独的组件处理,您可以在需要时将它们组合和替换在一起,例如:

一个知道如何处理文件系统并将路径转换为字节流的类; 知道如何处理通过 http 获取文件(例如 GCS 用例)并将其转换为字节流的类似类; 一个知道如何将字节流解析为数据的组件; 处理解析数据的组件; 其他东西可能在任何地方都可以生存;

通过这种方式,您可以轻松地为您的组件实现任何其他源,独立编写和测试它们。

例如,您可以将逻辑实现为 2 个连接 PCollections,其中一个将直接从 GCS 位置读取,解析文本行,并在实际业务逻辑中处理它,然后再将其与另一个 @ 连接起来987654328@.

【讨论】:

理想情况下,该文件必须为每个工人读取一次。管道的输入是 pub/sub。 我对你的问题有点困惑,但你见过this【参考方案2】:

我想我了解您正在/正在尝试做什么,并且我也希望这样做。

这对我有用(在 DoFn 的 setup() 方法中):

 if(not FileSystems.exists(local_db_location) ):
        with FileSystems.open(  self._cloud_database_loc ) as af:
            with FileSystems.create(local_db_location) as local_file:
                try:
                    shutil.copyfileobj(af,local_file,length=131072)
                except:
                    raise
    else:
        #DB exists

【讨论】:

以上是关于访问数据流管道内的文件的主要内容,如果未能解决你的问题,请参考以下文章

访问元数据会导致 Azure 数据工厂的嵌套管道

gitlab yaml管道内的while循环

for循环内的多处理

AWS 数据管道。 EC2Resource 无法访问红移

访问填充了从管道读取的数据的结构时出现分段错误

Bigquery - 在 CSV(联合表)中处理双引号和管道字段分隔符