将 PySpark 数据帧写入 Parquet 文件时出现 Py4JJavaError
Posted
技术标签:
【中文标题】将 PySpark 数据帧写入 Parquet 文件时出现 Py4JJavaError【英文标题】:Py4JJavaError while writing PySpark dataframe to Parquet file 【发布时间】:2022-01-04 10:22:59 【问题描述】:总之,我有 100k 行数据作为 csv 文件。这是它的示例:
身份证、姓名、姓氏、生日、详细信息 0,Agjqyru,Qtltzu,1923-02-23,“City=Neftchala, Gender=male, Education=collage” 1, Zkaczi, Gvuvwwle, 2002-02-28, "City=Mingachevir, Gender=female, Education=doctor" 2, Hkbfros, Llmufk, 1948-02-29, "City=Ujar, Gender=male, Education=collage" 3, Dddtulkeo, Fdnccbp, 1903-07-01, "City=Dashkasan, Gender=female, Education=单身汉" 4, Wssqm, Kzekihhqjmrd, 1935-05-10, "City=Baku, Gender=female, Education=uneducated" 5,Iurg,Nglzxwu,1915-04-02,“City=Khojavend, Gender=male, Education=school”
我的任务是将详细信息列分为 3 个单独的列并将数据保存为镶木地板文件。到目前为止,这是我的尝试:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext).getOrCreate(sparkContext)
sparkSession = SparkSession.builder.master("local").appName("parquet_example").getOrCreate()
# read csv file
__DATA = spark.read.csv('data.csv', header = True)
# divide column details into 3
__DATA = __DATA.withColumn("Details", split(regexp_replace(regexp_replace((regexp_replace("Details",'\|',"")),'\:',','),'\"|"',""),','))\
.withColumn("City", element_at("Details",1))\
.withColumn("Gender", element_at("Details",2))\
.withColumn("Education",element_at("Details",3)).drop("Details").rdd
# clean data
__DATA = __DATA.map(lambda x : (x[0], x[1], x[2], x[3][x[3].index("=")+1:], x[4][x[4].index("=")+1:], x[5][x[5].index("=")+1:]))
dfSchema = StructType([
StructField('Name', StringType(), True),
StructField('Surname', StringType(), True),
StructField('Birthdate', StringType(), True),
StructField('City', StringType(), True),
StructField('Gender', StringType(), True),
StructField('Education', StringType(), True)
])
__DATA = __DATA.toDF(dfSchema)
但是,每当我尝试使用以下代码将其保存到镶木地板文件时,我都会收到 Py4JJavaError
错误:
__DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet
那么,我错过了什么?
这是完整的错误堆栈:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-55-67be3fc6fc33> in <module>
----> 1 __DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py in parquet(self, path, mode, partitionBy, compression)
934 self.partitionBy(partitionBy)
935 self._set_opts(compression=compression)
--> 936 self._jwrite.parquet(path)
937
938 @since(1.6)
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1290.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
at sun.reflect.GeneratedMethodAccessor131.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 35.0 failed 1 times, most recent failure: Lost task 1.0 in stage 35.0 (TID 60, bkhddev01, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
... 32 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
【问题讨论】:
找不到子字符串 【参考方案1】:错误是因为下面的代码。
x[3][x[3].index("=")+1:]
在索引 3 处,您有 Birthdate
字段。您应该在索引 3 之后开始切片操作,从那里您将清理 Details
列下的字段。
您可以仅在 DF 级别进行清理操作,而不是将 DF 转换为 RDD。
__DATA = __DATA.withColumn("Details", split(regexp_replace("Details",'\|',""),','))\
.withColumn("City", split(col("Details")[0], '=')[1]) \
.withColumn("Gender", split(col("Details")[1], '=')[1])\
.withColumn("Education",split(col("Details")[2], '=')[1]).drop("Details")
【讨论】:
哦,这很明显。难怪教育领域从我的桌子上消失了。非常感谢。 但为什么我只在最后得到一个错误?当我尝试将其保存到镶木地板文件时? 另外,我决定使用 RDD 的原因是可扩展性。当数据变大时,RDD 的执行速度会更快。 Spark 有延迟执行,只要调用 action 方法,作业就会执行。由于 write 是一个动作作业,因此你最终得到了错误。 Dataframe 比 RDD 快。以上是关于将 PySpark 数据帧写入 Parquet 文件时出现 Py4JJavaError的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中将数据帧写入 CSV 后重命名文件 [重复]