使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错

Posted

技术标签:

【中文标题】使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错【英文标题】:Error when read csv with pyspark from AWS s3 Bucket 【发布时间】:2021-06-03 16:19:16 【问题描述】:

我正在使用一个运行 spark-cluster 的 docker-compose 文件。 SPARK_VERSION="3.0.0" HADOOP_VERSION="3.2"

所有文件都可以在以下 Github 链接中找到:https://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker

我正在尝试使用以下代码从 aws s3 读取 csv 文件:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.874,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField,StringType,IntegerType,DoubleType, LongType,StructType


sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("pyspark-2")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

AWS_ACCESS_KEY_ID = "AKIAXJFEEOGKW4RSZG3B"
AWS_SECRET_ACCESS_KEY = "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"

sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf=sc._jsc.hadoopConfiguration()
#hadoop_conf.set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
hadoop_conf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

df = spark.read.csv("s3a://<PATH>)

但我收到以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-24141d88d3e8> in <module>
----> 1 df = spark.read.csv("s3a://migrationawsbucket/allevents.csv")

/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
    533             path = [path]
    534         if type(path) == list:
--> 535             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    536         elif isinstance(path, RDD):
    537             def func(iterator):

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in _call_(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling 012.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o37.csv.
: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.StreamCapabilities
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 41 more

我已尝试在此链接中找到的不同解决方案:How can I read from S3 in pyspark running in local mode?

---更新-----

我已尝试按照 stevel 的建议更改 aws-java-sdk-bundlehadoop-aws,并在以下位置找到不同的版本:https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

我曾尝试使用:hadoop-aws-3.2.0.jaraws-java-sdk-bundle-1.11.874.jar 在此处找到答案后:AWS EKS Spark 3.0, Hadoop 3.2 Error - NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException

但同样的错误

【问题讨论】:

【参考方案1】:

hadoop-aws JAR 和 aws-sdk 之间存在根本性的不匹配。怕他们不混搭。

使用 mvnrepo 计算出您需要的 hadoop-* 工件 spark 的确切版本。并且不要考虑混合 hadoop-* 工件版本,因为它们与 spark JAR 一样紧密集成

我建议您升级到带有 hadoop-3.2.x 或更高版本工件的 spark 版本,这些工件与最新的 1.11 AWS 开发工具包兼容。

【讨论】:

我已经更新了我的问题,我尝试了不同的版本,但一切正常,你能帮帮我吗? 不,从堆栈跟踪来看,仍然是 hadoop-* jar 的混合体。如果您使用的是 EMR,则必须专门使用他们的东西。如果使用 ASF 版本,请获取一个 hadoop tarball (3.3.2) 并将 hadoop* JAR 与 Machining aws-sdk JAR 一起使用。 顺便说一句,cloudstore JAR 的“storediag”命令旨在查找和打印依赖项:github.com/steveloughran/cloudstore 这是我能提供帮助的极限。类路径问题是配置问题,因此只能由有问题的人调试。对不起

以上是关于使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错的主要内容,如果未能解决你的问题,请参考以下文章

从aws glue pyspark作业中的s3加载JSON

使用 pyspark/python 列出 AWS S3 目录中的文件

将 AWS S3 连接到 Databricks PySpark

使用 pyspark 将镶木地板文件(在 aws s3 中)存储到 spark 数据框中

使用 pyspark 从 s3 读取/加载 avro 文件

PySpark:AWS s3n 正在工作,但 s3a 没有