Apache Beam 数据流:从 Azure 到 GCS 的文件传输
Posted
技术标签:
【中文标题】Apache Beam 数据流:从 Azure 到 GCS 的文件传输【英文标题】:Apache beam Dataflow : File Transfer from Azure to GCS 【发布时间】:2021-08-03 09:02:30 【问题描述】:我尝试将文件从 Azure 容器传输到 GCS 存储桶,但最终出现以下问题
-
源文件中的记录顺序与目标文件的记录顺序不同,因为管道将进行并行处理
必须编写大量自定义代码来为 GCS 目标文件提供自定义名称,因为管道为其提供默认名称。
无论如何,Apache 管道可以在不处理文件内容的情况下传输文件本身(这样就不会发生上述问题)?因为我需要将多个文件从 Azure 容器传输到 GCS 存储桶
下面是我目前用来传输文件的代码
String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY_MM_DD_HH_MM_SS3")).toString();
String connectionString = "<<AZURE_STORAGE_CONNECTION_STRING>>";
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BlobstoreOptions.class).setAzureConnectionString(connectionString);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("azfs://storageaccountname/containername/CSVSample.csv"))
.apply("",FileIO.<String>write().to("azfs://storageaccountname/containername/"+format+"/").withNumShards(1).withSuffix(".csv")
.via(TextIO.sink()));
p.run().waitUntilFinish();
【问题讨论】:
【参考方案1】:您应该能够为此目的使用FileIO 转换。
例如(未经测试的伪代码),
FileIO.match().filepattern("azfs://storageaccountname/containername/CSVSample.csv")
.apply(FileIO.readMatches())
.apply(ParDo.of(new MyWriteDoFn()));
MyWriteDoFn()
上方是 DoFn
,它从单个文件中读取字节(使用 AzureBlobStoreFileSystem)并写入 GCS(使用 GCSFileSystem)。您可以使用 FileSystems 类中带有正确前缀的静态方法,而不是直接调用底层 FileSystem 实现的方法。
【讨论】:
以上是关于Apache Beam 数据流:从 Azure 到 GCS 的文件传输的主要内容,如果未能解决你的问题,请参考以下文章
从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名
Apache Beam/Google Dataflow - 将数据从 Google Datastore 导出到 Cloud Storage 中的文件