使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错
Posted
技术标签:
【中文标题】使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错【英文标题】:Error when read csv with pyspark from AWS s3 Bucket 【发布时间】:2021-06-03 16:19:16 【问题描述】:我正在使用一个运行 spark-cluster 的 docker-compose 文件。 SPARK_VERSION="3.0.0" HADOOP_VERSION="3.2"
所有文件都可以在以下 Github 链接中找到:https://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker
我正在尝试使用以下代码从 aws s3 读取 csv 文件:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.874,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField,StringType,IntegerType,DoubleType, LongType,StructType
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("pyspark-2")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
AWS_ACCESS_KEY_ID = "AKIAXJFEEOGKW4RSZG3B"
AWS_SECRET_ACCESS_KEY = "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"
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf=sc._jsc.hadoopConfiguration()
#hadoop_conf.set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
hadoop_conf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
df = spark.read.csv("s3a://<PATH>)
但我收到以下错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-24141d88d3e8> in <module>
----> 1 df = spark.read.csv("s3a://migrationawsbucket/allevents.csv")
/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
533 path = [path]
534 if type(path) == list:
--> 535 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
536 elif isinstance(path, RDD):
537 def func(iterator):
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in _call_(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o37.csv.
: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.StreamCapabilities
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 41 more
我已尝试在此链接中找到的不同解决方案:How can I read from S3 in pyspark running in local mode?
---更新-----
我已尝试按照 stevel 的建议更改 aws-java-sdk-bundle
和 hadoop-aws
,并在以下位置找到不同的版本:https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
我曾尝试使用:hadoop-aws-3.2.0.jar
和
aws-java-sdk-bundle-1.11.874.jar
在此处找到答案后:AWS EKS Spark 3.0, Hadoop 3.2 Error - NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException
但同样的错误
【问题讨论】:
【参考方案1】:hadoop-aws JAR 和 aws-sdk 之间存在根本性的不匹配。怕他们不混搭。
使用 mvnrepo 计算出您需要的 hadoop-* 工件 spark 的确切版本。并且不要考虑混合 hadoop-* 工件版本,因为它们与 spark JAR 一样紧密集成
我建议您升级到带有 hadoop-3.2.x 或更高版本工件的 spark 版本,这些工件与最新的 1.11 AWS 开发工具包兼容。
【讨论】:
我已经更新了我的问题,我尝试了不同的版本,但一切正常,你能帮帮我吗? 不,从堆栈跟踪来看,仍然是 hadoop-* jar 的混合体。如果您使用的是 EMR,则必须专门使用他们的东西。如果使用 ASF 版本,请获取一个 hadoop tarball (3.3.2) 并将 hadoop* JAR 与 Machining aws-sdk JAR 一起使用。 顺便说一句,cloudstore JAR 的“storediag”命令旨在查找和打印依赖项:github.com/steveloughran/cloudstore 这是我能提供帮助的极限。类路径问题是配置问题,因此只能由有问题的人调试。对不起以上是关于使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark/python 列出 AWS S3 目录中的文件
将 AWS S3 连接到 Databricks PySpark