将数据从 gs 存储桶移动到 s3 亚马逊存储桶的 GCP dataproc 集群 hadoop 作业失败 [控制台]
Posted
技术标签:
【中文标题】将数据从 gs 存储桶移动到 s3 亚马逊存储桶的 GCP dataproc 集群 hadoop 作业失败 [控制台]【英文标题】:GCP dataproc cluster hadoop job to move data from gs bucket to s3 amazon bucket fails [CONSOLE] 【发布时间】:2021-07-02 20:26:18 【问题描述】:关于堆栈溢出的第一个问题,所以请原谅我的任何菜鸟错误。
我目前正在将大量数据(700+ GiB)从 GCS 存储桶中的一个文件夹移动到 s3 中的一个文件夹,其中包含许多大约 1-10MB 的小文件。
我做了几次尝试:
gsutil -m rsync -r gs://<path> s3://<path>
数据量大导致超时
gsutil -m cp -r gs://<path> s3://<path>
耗时太长了。即使有许多并行进程和/或线程,它的平均传输速度仍然约为 3.4MiB/s。我已确保在此尝试中升级 VM 实例。
使用rclone
与 cp 相同的性能问题
最近我发现了另一种可能的方法。但是我对 GCP 不熟悉,所以请多多包涵,抱歉。 这是我找到的参考资料https://medium.com/swlh/transfer-data-from-gcs-to-s3-using-google-dataproc-with-airflow-aa49dc896dad 该方法包括通过 GCP 控制台使用以下配置创建一个 dataproc 集群:
Name:
<dataproc-cluster-name>
Region:
asia-southeast1
Nodes configuration:
1 main 2 worker @2vCPU & @3.75GBMemory & @30GBPersistentDisk
properties:
core fs.s3.awsAccessKeyId <key>
core fs.s3.awsSecretAccessKey <secret>
core fs.s3.impl org.apache.hadoop.fs.s3.S3FileSystem
然后我通过 GCP 网站的控制台菜单提交作业:
-
此时,我开始注意到问题,我在任何地方都找不到
hadoop-mapreduce/hadoop-distcp.jar
。我只能通过我的主 dataproc 集群 VM 实例浏览根文件来找到 /usr/lib/hadoop/hadoop-distcp.jar
我提交的工作:
Start time:
31 Mar 2021, 16:00:25
Elapsed time:
3 sec
Status:
Failed
Region
asia-southeast1
Cluster
<cluster-name>
Job type
Hadoop
Main class or JAR
file://usr/lib/hadoop/hadoop-distcp.jar
Arguments
-update
gs://*
s3://*
返回错误
/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2400: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_USER: invalid variable name
/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2365: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_USER: invalid variable name
/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2460: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_OPTS: invalid variable name
2021-03-31 09:00:28,549 ERROR tools.DistCp: Invalid arguments:
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2638)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3342)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:126)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3425)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3393)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:240)
at org.apache.hadoop.tools.DistCp.run(DistCp.java:143)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.tools.DistCp.main(DistCp.java:441)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2542)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2636)
... 18 more
Invalid arguments: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
usage: distcp OPTIONS [source_path...] <target_path>
OPTIONS
-append Reuse existing data in target files and
append new data to them if possible
-async Should distcp execution be blocking
-atomic Commit all changes or none
-bandwidth <arg> Specify bandwidth per map in MB, accepts
bandwidth as a fraction.
-blocksperchunk <arg> If set to a positive value, fileswith more
blocks than this value will be split into
chunks of <blocksperchunk> blocks to be
transferred in parallel, and reassembled on
the destination. By default,
<blocksperchunk> is 0 and the files will be
transmitted in their entirety without
splitting. This switch is only applicable
when the source file system implements
getBlockLocations method and the target
file system implements concat method
-copybuffersize <arg> Size of the copy buffer to use. By default
<copybuffersize> is 8192B.
-delete Delete from target, files missing in
source. Delete is applicable only with
update or overwrite options
-diff <arg> Use snapshot diff report to identify the
difference between source and target
-direct Write files directly to the target
location, avoiding temporary file rename.
-f <arg> List of files that need to be copied
-filelimit <arg> (Deprecated!) Limit number of files copied
to <= n
-filters <arg> The path to a file containing a list of
strings for paths to be excluded from the
copy.
-i Ignore failures during copy
-log <arg> Folder on DFS where distcp execution logs
are saved
-m <arg> Max number of concurrent maps to use for
copy
-numListstatusThreads <arg> Number of threads to use for building file
listing (max 40).
-overwrite Choose to overwrite target files
unconditionally, even if they exist.
-p <arg> preserve status (rbugpcaxt)(replication,
block-size, user, group, permission,
checksum-type, ACL, XATTR, timestamps). If
-p is specified with no <arg>, then
preserves replication, block size, user,
group, permission, checksum type and
timestamps. raw.* xattrs are preserved when
both the source and destination paths are
in the /.reserved/raw hierarchy (HDFS
only). raw.* xattrpreservation is
independent of the -p flag. Refer to the
DistCp documentation for more details.
-rdiff <arg> Use target snapshot diff report to identify
changes made on target
-sizelimit <arg> (Deprecated!) Limit number of files copied
to <= n bytes
-skipcrccheck Whether to skip CRC checks between source
and target paths.
-strategy <arg> Copy strategy to use. Default is dividing
work based on file sizes
-tmp <arg> Intermediate work path to be used for
atomic commit
-update Update target, copying only missing files
or directories
-v Log additional info (path, size) in the
SKIP/COPY log
-xtrack <arg> Save information about missing source files
to the specified directory
我该如何解决这个问题?我在网上找到的几个修复都不是很有帮助。他们要么使用 hadoop cli,要么使用不同的 jar 文件作为我的。例如这里的这个:Move data from google cloud storage to S3 using dataproc hadoop cluster and airflow 和 https://github.com/CoorpAcademy/docker-pyspark/issues/13
免责声明:我不使用 hadoop cli 或气流。我使用控制台来执行此操作,通过 dataproc 集群主 VM 实例 shell 提交作业也返回相同的错误。如果需要,任何详细的参考将不胜感激,非常感谢!
更新:
-
修复了 gsutil 部分上的误导性路径替换
问题是由于 hadoop 不再支持 s3FileSystem。所以我必须降级到使用 hadoop 2.10 [已修复] 的图像。不过速度也不尽如人意
【问题讨论】:
【参考方案1】:我认为 Dataproc 解决方案在您的情况下是多余的。如果您需要每天或每小时将 TB 的数据从 GCS 复制到 S3,Dataproc 将是有意义的。但听起来你的只是一个一次性的副本,你可以让它运行几个小时或几天。我建议在 Google Cloud (GCP) 实例上运行 gsutil。我已经为此尝试了一个 AWS EC2 实例,但对于这个特定的操作,它总是明显变慢。
在同一区域中创建您的源存储桶和目标存储桶。例如,用于 GCS 的 us-east4(弗吉尼亚北部)和用于 S3 的 us-east-1(弗吉尼亚北部)。然后在同一个 GCP 区域中部署您的实例。
gsutil -m cp -r gs://* s3://*
。 . .可能不会工作。它在 Dataproc 中肯定不起作用,如果我没有明确的文件位置或以 / 结尾的存储桶/文件夹,它总是会出错
相反,首先尝试成功地显式复制一个文件。然后尝试整个文件夹或存储桶。
您要复制多少个文件?
【讨论】:
嗨!感谢您的回复。就在今天我发现问题是由于 hadoop 3.2 不支持 s3 连接器,因此我需要回到 hadoop 2.10。关于路径:对不起,(*)表示我刚刚替换的一些路径,它有一个实际目录。至于您的问题:我正在尝试复制数百万个文件,但最终速度太慢了。我的最后一次尝试(GCP 实例中的 gsutil)以 150 小时的 ETA 实现了 3.5 MiB/s 的传输速度!持续时间需要更短。由于这是一个企业问题,存储桶区域是不可更改的,至少目前是这样。谢谢以上是关于将数据从 gs 存储桶移动到 s3 亚马逊存储桶的 GCP dataproc 集群 hadoop 作业失败 [控制台]的主要内容,如果未能解决你的问题,请参考以下文章
授予 Redshift Cluster 对另一个账户拥有的 S3 存储桶的访问权限
cloudfront 和 s3 301 重定向显示存储桶的裸域