访问数据流管道内的文件
Posted
技术标签:
【中文标题】访问数据流管道内的文件【英文标题】:Access File inside the dataflow pipeline 【发布时间】:2019-05-29 02:02:55 【问题描述】:我想在管道启动之前从临时位置下载某些文件。要在 ParDo 功能中读取的文件 .mmdb 文件。文件存储在 Google 存储上,但使用 .mmdb 文件的方法需要它们是一个 File(java.io) 对象。
如果我将它包含在 --filesToStage 中,它们可以作为 InputStream 拉链里面。我想以文件而不是 InputStream 的形式访问它们。 实现这一目标的最佳方法是什么?
我目前正在将文件下载到 ParDo 设置中工作人员的临时文件夹中。
【问题讨论】:
【参考方案1】:这是一个非常广泛和高级别的问题。答案取决于您使用文件的逻辑。 File
代表file on a filesystem,因此如果您有一个组件要求输入是File
的实例,那么将其写入本地临时文件夹是正确的做法。 Beam 没有为这种情况提供更好的抽象。
但是,我建议您考虑更新当前处理 Files
的逻辑,以便也接受其他类型的输入。您可能会遇到由于缺乏关注点分离和紧密耦合而导致的问题。也就是说,您有一个组件,它接收File
,打开它,在打开它时处理错误,读取它,从中解析数据,甚至可能验证和处理数据。所有这些都是单独的问题,可能应该由单独的组件处理,您可以在需要时将它们组合和替换在一起,例如:
通过这种方式,您可以轻松地为您的组件实现任何其他源,独立编写和测试它们。
例如,您可以将逻辑实现为 2 个连接 PCollections
,其中一个将直接从 GCS 位置读取,解析文本行,并在实际业务逻辑中处理它,然后再将其与另一个 @ 连接起来987654328@.
【讨论】:
理想情况下,该文件必须为每个工人读取一次。管道的输入是 pub/sub。 我对你的问题有点困惑,但你见过this【参考方案2】:我想我了解您正在/正在尝试做什么,并且我也希望这样做。
这对我有用(在 DoFn 的 setup() 方法中):
if(not FileSystems.exists(local_db_location) ):
with FileSystems.open( self._cloud_database_loc ) as af:
with FileSystems.create(local_db_location) as local_file:
try:
shutil.copyfileobj(af,local_file,length=131072)
except:
raise
else:
#DB exists
【讨论】:
以上是关于访问数据流管道内的文件的主要内容,如果未能解决你的问题,请参考以下文章