使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]

Posted

技术标签:

【中文标题】使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]【英文标题】:Getting error while reading from S3 server using pyspark : [java.lang.IllegalArgumentException] 【发布时间】:2019-11-29 09:09:56 【问题描述】:

我正在尝试使用 S3 中的 pyspark 读取文件并出现以下错误 --

Traceback (most recent call last):
  File "C:/Users/Desktop/POC_Pyspark/create_csv_ecs.py", line 41, in <module>
df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 166, in load
return self._df(self._jreader.load(path))
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in __call__
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.lang.IllegalArgumentException
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1307)
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1230)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:280)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:705)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)

我的代码很简单,但是我可以使用 BOTO3 进行连接,但我需要使用 pyspark,因为我正在处理的文件很大,并且还需要对 CSV 进行一些聚合 --

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Window, SparkSession
import pyspark as spark
import pyspark.sql.functions as fn

conf = spark.SparkConf().setAppName("Full PSGL Aggregation - PySpark")

sc = spark.SQLContext(spark.SparkContext(conf=conf))
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "secret key")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://end point:8020")

df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
print(df)

java版本-

java version "1.8.0_66"

蟒蛇-

Python 3.6.1 |Anaconda 4.4.0 (64-bit)

pyspark/spark -

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 3.6.1 (default, May 11 2017 13:25:24)

如果需要更多信息,请告诉我。

【问题讨论】:

【参考方案1】:

查看S3AFileSystem的源代码,似乎fs.s3a.threads.maxfs.s3a.threads.core参数丢失或不相关。

就我而言,添加

sc._jsc.hadoopConfiguration().set("fs.s3a.threads.max", "4")
sc._jsc.hadoopConfiguration().set("fs.s3a.threads.core", "4")

解决了这个问题。

【讨论】:

【参考方案2】:

作为S3AFileSystemhadoop-aws-2.7.7.jar 中的源代码 至少将此配置添加到您的core-site.xml

<property>
  <name>fs.s3a.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
  <name>fs.s3a.multipart.size</name>
  <value>104857600</value>
</property>
<property>
  <name>fs.s3a.multipart.threshold</name>
  <value>134217728</value>
</property>
<property>
  <name>fs.s3a.threads.max</name>
  <value>256</value>
</property>
<property>
  <name>fs.s3a.threads.core</name>
  <value>4</value>
</property>
<property>
  <name>fs.s3a.block.size</name>
  <value>33554432</value>
</property>

【讨论】:

以上是关于使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pyspark 在 s3 中写入镶木地板文件时出错

无法使用本地 PySpark 从 S3 读取 json 文件

Pyspark 从 S3 存储桶的子目录中读取所有 JSON 文件

将文件写入 s3 时出错 - pyspark

使用本地机器从 s3 读取数据 - pyspark

使用 pyspark 从 s3 读取/加载 avro 文件