将数据从 gs 存储桶移动到 s3 亚马逊存储桶的 GCP dataproc 集群 hadoop 作业失败 [控制台]

Posted

技术标签:

【中文标题】将数据从 gs 存储桶移动到 s3 亚马逊存储桶的 GCP dataproc 集群 hadoop 作业失败 [控制台]【英文标题】:GCP dataproc cluster hadoop job to move data from gs bucket to s3 amazon bucket fails [CONSOLE] 【发布时间】:2021-07-02 20:26:18 【问题描述】:

关于堆栈溢出的第一个问题,所以请原谅我的任何菜鸟错误。

我目前正在将大量数据(700+ GiB)从 GCS 存储桶中的一个文件夹移动到 s3 中的一个文件夹,其中包含许多大约 1-10MB 的小文件。

我做了几次尝试:

    gsutil -m rsync -r gs://<path> s3://<path> 数据量大导致超时 gsutil -m cp -r gs://<path> s3://<path> 耗时太长了。即使有许多并行进程和/或线程,它的平均传输速度仍然约为 3.4MiB/s。我已确保在此尝试中升级 VM 实例。 使用rclone 与 cp 相同的性能问题

最近我发现了另一种可能的方法。但是我对 GCP 不熟悉,所以请多多包涵,抱歉。 这是我找到的参考资料https://medium.com/swlh/transfer-data-from-gcs-to-s3-using-google-dataproc-with-airflow-aa49dc896dad 该方法包括通过 GCP 控制台使用以下配置创建一个 dataproc 集群:

Name:
    <dataproc-cluster-name>
Region:
    asia-southeast1
Nodes configuration:
    1 main 2 worker @2vCPU & @3.75GBMemory & @30GBPersistentDisk
properties:
    core    fs.s3.awsAccessKeyId        <key>
    core    fs.s3.awsSecretAccessKey    <secret>
    core    fs.s3.impl                  org.apache.hadoop.fs.s3.S3FileSystem

然后我通过 GCP 网站的控制台菜单提交作业:

    此时,我开始注意到问题,我在任何地方都找不到hadoop-mapreduce/hadoop-distcp.jar。我只能通过我的主 dataproc 集群 VM 实例浏览根文件来找到 /usr/lib/hadoop/hadoop-distcp.jar 我提交的工作:
Start time:
31 Mar 2021, 16:00:25
Elapsed time:
3 sec
Status:
Failed
Region
asia-southeast1
Cluster
<cluster-name>
Job type
Hadoop
Main class or JAR
file://usr/lib/hadoop/hadoop-distcp.jar
Arguments
-update
gs://*
s3://*

返回错误

/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2400: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_USER: invalid variable name
/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2365: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_USER: invalid variable name
/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2460: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_OPTS: invalid variable name
2021-03-31 09:00:28,549 ERROR tools.DistCp: Invalid arguments: 
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2638)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3342)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:126)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3425)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3393)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:240)
    at org.apache.hadoop.tools.DistCp.run(DistCp.java:143)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.tools.DistCp.main(DistCp.java:441)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
    at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2542)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2636)
    ... 18 more
Invalid arguments: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
usage: distcp OPTIONS [source_path...] <target_path>
              OPTIONS
 -append                       Reuse existing data in target files and
                               append new data to them if possible
 -async                        Should distcp execution be blocking
 -atomic                       Commit all changes or none
 -bandwidth <arg>              Specify bandwidth per map in MB, accepts
                               bandwidth as a fraction.
 -blocksperchunk <arg>         If set to a positive value, fileswith more
                               blocks than this value will be split into
                               chunks of <blocksperchunk> blocks to be
                               transferred in parallel, and reassembled on
                               the destination. By default,
                               <blocksperchunk> is 0 and the files will be
                               transmitted in their entirety without
                               splitting. This switch is only applicable
                               when the source file system implements
                               getBlockLocations method and the target
                               file system implements concat method
 -copybuffersize <arg>         Size of the copy buffer to use. By default
                               <copybuffersize> is 8192B.
 -delete                       Delete from target, files missing in
                               source. Delete is applicable only with
                               update or overwrite options
 -diff <arg>                   Use snapshot diff report to identify the
                               difference between source and target
 -direct                       Write files directly to the target
                               location, avoiding temporary file rename.
 -f <arg>                      List of files that need to be copied
 -filelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n
 -filters <arg>                The path to a file containing a list of
                               strings for paths to be excluded from the
                               copy.
 -i                            Ignore failures during copy
 -log <arg>                    Folder on DFS where distcp execution logs
                               are saved
 -m <arg>                      Max number of concurrent maps to use for
                               copy
 -numListstatusThreads <arg>   Number of threads to use for building file
                               listing (max 40).
 -overwrite                    Choose to overwrite target files
                               unconditionally, even if they exist.
 -p <arg>                      preserve status (rbugpcaxt)(replication,
                               block-size, user, group, permission,
                               checksum-type, ACL, XATTR, timestamps). If
                               -p is specified with no <arg>, then
                               preserves replication, block size, user,
                               group, permission, checksum type and
                               timestamps. raw.* xattrs are preserved when
                               both the source and destination paths are
                               in the /.reserved/raw hierarchy (HDFS
                               only). raw.* xattrpreservation is
                               independent of the -p flag. Refer to the
                               DistCp documentation for more details.
 -rdiff <arg>                  Use target snapshot diff report to identify
                               changes made on target
 -sizelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n bytes
 -skipcrccheck                 Whether to skip CRC checks between source
                               and target paths.
 -strategy <arg>               Copy strategy to use. Default is dividing
                               work based on file sizes
 -tmp <arg>                    Intermediate work path to be used for
                               atomic commit
 -update                       Update target, copying only missing files
                               or directories
 -v                            Log additional info (path, size) in the
                               SKIP/COPY log
 -xtrack <arg>                 Save information about missing source files
                               to the specified directory

我该如何解决这个问题?我在网上找到的几个修复都不是很有帮助。他们要么使用 hadoop cli,要么使用不同的 jar 文件作为我的。例如这里的这个:Move data from google cloud storage to S3 using dataproc hadoop cluster and airflow 和 https://github.com/CoorpAcademy/docker-pyspark/issues/13

免责声明:我不使用 hadoop cli 或气流。我使用控制台来执行此操作,通过 dataproc 集群主 VM 实例 shell 提交作业也返回相同的错误。如果需要,任何详细的参考将不胜感激,非常感谢!

更新:

    修复了 gsutil 部分上的误导性路径替换 问题是由于 hadoop 不再支持 s3FileSystem。所以我必须降级到使用 hadoop 2.10 [已修复] 的图像。不过速度也不尽如人意

【问题讨论】:

【参考方案1】:

我认为 Dataproc 解决方案在您的情况下是多余的。如果您需要每天或每小时将 TB 的数据从 GCS 复制到 S3,Dataproc 将是有意义的。但听起来你的只是一个一次性的副本,你可以让它运行几个小时或几天。我建议在 Google Cloud (GCP) 实例上运行 gsutil。我已经为此尝试了一个 AWS EC2 实例,但对于这个特定的操作,它总是明显变慢。

在同一区域中创建您的源存储桶和目标存储桶。例如,用于 GCS 的 us-east4(弗吉尼亚北部)和用于 S3 的 us-east-1(弗吉尼亚北部)。然后在同一个 GCP 区域中部署您的实例。

gsutil -m cp -r gs://* s3://*

。 . .可能不会工作。它在 Dataproc 中肯定不起作用,如果我没有明确的文件位置或以 / 结尾的存储桶/文件夹,它总是会出错

相反,首先尝试成功地显式复制一个文件。然后尝试整个文件夹或存储桶。

您要复制多少个文件?

【讨论】:

嗨!感谢您的回复。就在今天我发现问题是由于 hadoop 3.2 不支持 s3 连接器,因此我需要回到 hadoop 2.10。关于路径:对不起,(*)表示我刚刚替换的一些路径,它有一个实际目录。至于您的问题:我正在尝试复制数百万个文件,但最终速度太慢了。我的最后一次尝试(GCP 实例中的 gsutil)以 150 小时的 ETA 实现了 3.5 MiB/s 的传输速度!持续时间需要更短。由于这是一个企业问题,存储桶区域是不可更改的,至少目前是这样。谢谢

以上是关于将数据从 gs 存储桶移动到 s3 亚马逊存储桶的 GCP dataproc 集群 hadoop 作业失败 [控制台]的主要内容,如果未能解决你的问题,请参考以下文章

授予 Redshift Cluster 对另一个账户拥有的 S3 存储桶的访问权限

将文本输出写入 S3 存储桶的最佳实践是啥?

cloudfront 和 s3 301 重定向显示存储桶的裸域

私有登台S3存储桶的策略

无需凭据即可从特定 IP 授予对 AWS S3 存储桶的访问权限

定期将查询结果从 Redshift 移动到 S3 存储桶