Apache Beam 数据流:从 Azure 到 GCS 的文件传输

Posted

技术标签:

【中文标题】Apache Beam 数据流:从 Azure 到 GCS 的文件传输【英文标题】:Apache beam Dataflow : File Transfer from Azure to GCS 【发布时间】:2021-08-03 09:02:30 【问题描述】:

我尝试将文件从 Azure 容器传输到 GCS 存储桶,但最终出现以下问题

    源文件中的记录顺序与目标文件的记录顺序不同,因为管道将进行并行处理 必须编写大量自定义代码来为 GCS 目标文件提供自定义名称,因为管道为其提供默认名称。

无论如何,Apache 管道可以在不处理文件内容的情况下传输文件本身(这样就不会发生上述问题)?因为我需要将多个文件从 Azure 容器传输到 GCS 存储桶

下面是我目前用来传输文件的代码

String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY_MM_DD_HH_MM_SS3")).toString();

String connectionString = "<<AZURE_STORAGE_CONNECTION_STRING>>"; 
        
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BlobstoreOptions.class).setAzureConnectionString(connectionString);
        
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("azfs://storageaccountname/containername/CSVSample.csv"))
.apply("",FileIO.<String>write().to("azfs://storageaccountname/containername/"+format+"/").withNumShards(1).withSuffix(".csv")
        .via(TextIO.sink()));
p.run().waitUntilFinish();

【问题讨论】:

【参考方案1】:

您应该能够为此目的使用FileIO 转换。

例如(未经测试的伪代码),

FileIO.match().filepattern("azfs://storageaccountname/containername/CSVSample.csv")
.apply(FileIO.readMatches())
.apply(ParDo.of(new MyWriteDoFn()));

MyWriteDoFn() 上方是 DoFn,它从单个文件中读取字节(使用 AzureBlobStoreFileSystem)并写入 GCS(使用 GCSFileSystem)。您可以使用 FileSystems 类中带有正确前缀的静态方法,而不是直接调用底层 FileSystem 实现的方法。

【讨论】:

以上是关于Apache Beam 数据流:从 Azure 到 GCS 的文件传输的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Beam 从数据库中读取批量数据

Apache Beam 数据流 BigQuery

从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名

Apache Beam/Google Dataflow - 将数据从 Google Datastore 导出到 Cloud Storage 中的文件

从 Apache Beam(GCP 数据流)写入 ConfluentCloud

Apache Beam 中的窗口数据每小时(顺时针)基础