如何通过 Apache Beam 将文件上传到 Azure blob 存储?

Posted

技术标签:

【中文标题】如何通过 Apache Beam 将文件上传到 Azure blob 存储?【英文标题】:How to upload a file to Azure blob Storage by Apache Beam? 【发布时间】:2022-01-12 05:06:50 【问题描述】:

我想通过 Apache Beam 将文件上传到 Azure blob。 但是,我做不到。为什么?

我设置了正确的环境变量。

az 命令正常:

$ az storage blob upload \
-c AZURE_STORAGE_CONTAINER_NAME \
-f example.json -n example.json \
--account-name $AZURE_STORAGE_ACCOUNT \
--account-key $AZURE_STORAGE_KEY                                                                        
Finished[#############################################################]  100.0000%

  "etag": "\"0x8D9B92C4C0BE870\"",
  "lastModified": "2021-12-07T02:50:15+00:00"

但是,下面的命令运行:

$ mvn compile exec:java -Dexec.mainClass=jp.example.Indexer \
-Dexec.args="--runner=DirectRunner \
--destination=azfs://$AZURE_STORAGE_ACCOUNT/$AZURE_STORAGE_CONTAINER_NAME/example.json \
--source=example.json \
--azureConnectionString=$AZURE_CONNECTION_STRING \
--sasToken=$AZURE_STORAGE_SAS_TOKEN \
--accessKey=$AZURE_STORAGE_KEY \
--accountName=$AZURE_STORAGE_ACCOUNT

然后,出现以下错误:

[WARNING] 
java.lang.IllegalArgumentException: PipelineOptions specified failed to serialize to JSON.
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:171)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
    at jp.example.Indexer.main (Indexer.java:24)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c'
    at com.fasterxml.jackson.databind.JsonMappingException.fromUnexpectedIOE (JsonMappingException.java:334)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes (ObjectMapper.java:3769)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:168)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
    at jp.example.Indexer.main (Indexer.java:24)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  13.564 s
[INFO] Finished at: 2021-12-07T11:58:57+09:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project Indexer: An exception occured while executing the Java class. PipelineOptions specified failed to serialize to JSON.: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c' -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

Indexer.java 在这里。

package jp. example;

import java.util.logging.Logger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;


public class Indexer 

    private static final Logger LOG = Logger.getLogger(Indexer.class.getName());

    public static void main(String[] args) 
        ToAzurePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(ToAzurePipelineOptions.class);
        Pipeline p = Pipeline.create(options);

        PCollection<String> lines = p.apply(TextIO.read().from(options.getSource()));

        lines.apply(TextIO.write().to(options.getDestination()));

        p.run();
    

ToAzurePipelineOptions.java 在这里。

package jp.example;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;

public interface ToAzurePipelineOptions extends DataflowPipelineOptions, BlobstoreOptions

    @Description("Root path of data files")
    @Default.String("file://hoge")
    String getSource();

    void setSource(String value);

    @Description("Address of Azure Storage")
    @Default.String("azfs://hoge")
    String getDestination();

    void setDestination(String value);

beam-sdks-java-io-azurebeam-sdks-java-core 的版本是2.31.0

【问题讨论】:

【参考方案1】:

我想这可能是因为 TokenCredentialSerializer 是 implemented 仅在 Beam 2.33.0 中。您能否将您的 Beam 依赖项至少升级到 Beam 2.33.0,看看它是否能解决问题?

【讨论】:

感谢您的提示!我将Beam的版本设置为2.33.0,将beam-sdks-java-io-azure的版本设置为2.34.0。解决了!

以上是关于如何通过 Apache Beam 将文件上传到 Azure blob 存储?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam 处理文件

Apache Beam on Dataflow - 加载外部文件

Apache Beam 数据流:从 Azure 到 GCS 的文件传输

Apache Beam 批量到 BigQuery,中间文件,它们是不是仅以 JSON 格式生成

通过 Google Cloud Pub/Sub 将数据重播到 Apache Beam 管道中,而不会使其他订阅者超载

Apache Beam Python gscio 上传方法已实现@retry.no_retries 导致数据丢失?