使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]
Posted
技术标签:
【中文标题】使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]【英文标题】:Getting error while reading from S3 server using pyspark : [java.lang.IllegalArgumentException] 【发布时间】:2019-11-29 09:09:56 【问题描述】:我正在尝试使用 S3 中的 pyspark 读取文件并出现以下错误 --
Traceback (most recent call last):
File "C:/Users/Desktop/POC_Pyspark/create_csv_ecs.py", line 41, in <module>
df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 166, in load
return self._df(self._jreader.load(path))
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in __call__
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.lang.IllegalArgumentException
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1307)
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1230)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:280)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:705)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
我的代码很简单,但是我可以使用 BOTO3 进行连接,但我需要使用 pyspark,因为我正在处理的文件很大,并且还需要对 CSV 进行一些聚合 --
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Window, SparkSession
import pyspark as spark
import pyspark.sql.functions as fn
conf = spark.SparkConf().setAppName("Full PSGL Aggregation - PySpark")
sc = spark.SQLContext(spark.SparkContext(conf=conf))
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "secret key")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://end point:8020")
df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
print(df)
java版本-
java version "1.8.0_66"
蟒蛇-
Python 3.6.1 |Anaconda 4.4.0 (64-bit)
pyspark/spark -
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Python version 3.6.1 (default, May 11 2017 13:25:24)
如果需要更多信息,请告诉我。
【问题讨论】:
【参考方案1】:查看S3AFileSystem的源代码,似乎fs.s3a.threads.max
或fs.s3a.threads.core
参数丢失或不相关。
就我而言,添加
sc._jsc.hadoopConfiguration().set("fs.s3a.threads.max", "4")
sc._jsc.hadoopConfiguration().set("fs.s3a.threads.core", "4")
解决了这个问题。
【讨论】:
【参考方案2】:作为S3AFileSystem
的hadoop-aws-2.7.7.jar
中的源代码
至少将此配置添加到您的core-site.xml
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.multipart.size</name>
<value>104857600</value>
</property>
<property>
<name>fs.s3a.multipart.threshold</name>
<value>134217728</value>
</property>
<property>
<name>fs.s3a.threads.max</name>
<value>256</value>
</property>
<property>
<name>fs.s3a.threads.core</name>
<value>4</value>
</property>
<property>
<name>fs.s3a.block.size</name>
<value>33554432</value>
</property>
【讨论】:
以上是关于使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]的主要内容,如果未能解决你的问题,请参考以下文章
无法使用本地 PySpark 从 S3 读取 json 文件