将 PySpark 数据帧写入 Parquet 文件时出现 Py4JJavaError

Posted

技术标签:

【中文标题】将 PySpark 数据帧写入 Parquet 文件时出现 Py4JJavaError【英文标题】:Py4JJavaError while writing PySpark dataframe to Parquet file 【发布时间】:2022-01-04 10:22:59 【问题描述】:

总之,我有 100k 行数据作为 csv 文件。这是它的示例:

身份证、姓名、姓氏、生日、详细信息 0,Agjqyru,Qtltzu,1923-02-23,“City=Neftchala, Gender=male, Education=collage” 1, Zkaczi, Gvuvwwle, 2002-02-28, "City=Mingachevir, Gender=female, Education=doctor" 2, Hkbfros, Llmufk, 1948-02-29, "City=Ujar, Gender=male, Education=collage" 3, Dddtulkeo, Fdnccbp, 1903-07-01, "City=Dashkasan, Gender=female, Education=单身汉" 4, Wssqm, Kzekihhqjmrd, 1935-05-10, "City=Baku, Gender=female, Education=uneducated" 5,Iurg,Nglzxwu,1915-04-02,“City=Khojavend, Gender=male, Education=school”

我的任务是将详细信息列分为 3 个单独的列并将数据保存为镶木地板文件。到目前为止,这是我的尝试:

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext).getOrCreate(sparkContext)
sparkSession = SparkSession.builder.master("local").appName("parquet_example").getOrCreate()

# read csv file 
__DATA = spark.read.csv('data.csv', header = True)

# divide column details into 3
__DATA = __DATA.withColumn("Details", split(regexp_replace(regexp_replace((regexp_replace("Details",'\|',"")),'\:',','),'\"|"',""),','))\
.withColumn("City", element_at("Details",1))\
.withColumn("Gender", element_at("Details",2))\
.withColumn("Education",element_at("Details",3)).drop("Details").rdd

# clean data
__DATA = __DATA.map(lambda x : (x[0], x[1], x[2], x[3][x[3].index("=")+1:], x[4][x[4].index("=")+1:], x[5][x[5].index("=")+1:]))

dfSchema = StructType([       
    StructField('Name', StringType(), True),
    StructField('Surname', StringType(), True),
    StructField('Birthdate', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Gender', StringType(), True),
    StructField('Education', StringType(), True)
])

__DATA = __DATA.toDF(dfSchema)

但是,每当我尝试使用以下代码将其保存到镶木地板文件时,我都会收到 Py4JJavaError 错误:

__DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet

那么,我错过了什么?

这是完整的错误堆栈:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-55-67be3fc6fc33> in <module>
----> 1 __DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py in parquet(self, path, mode, partitionBy, compression)
    934             self.partitionBy(partitionBy)
    935         self._set_opts(compression=compression)
--> 936         self._jwrite.parquet(path)
    937 
    938     @since(1.6)

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling 012.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1290.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
    at sun.reflect.GeneratedMethodAccessor131.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 35.0 failed 1 times, most recent failure: Lost task 1.0 in stage 35.0 (TID 60, bkhddev01, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
    ... 32 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

【问题讨论】:

找不到子字符串 【参考方案1】:

错误是因为下面的代码。

x[3][x[3].index("=")+1:]

在索引 3 处,您有 Birthdate 字段。您应该在索引 3 之后开始切片操作,从那里您将清理 Details 列下的字段。

您可以仅在 DF 级别进行清理操作,而不是将 DF 转换为 RDD。

__DATA = __DATA.withColumn("Details", split(regexp_replace("Details",'\|',""),','))\
.withColumn("City", split(col("Details")[0], '=')[1]) \
.withColumn("Gender", split(col("Details")[1], '=')[1])\
.withColumn("Education",split(col("Details")[2], '=')[1]).drop("Details")

【讨论】:

哦,这很明显。难怪教育领域从我的桌子上消失了。非常感谢。 但为什么我只在最后得到一个错误?当我尝试将其保存到镶木地板文件时? 另外,我决定使用 RDD 的原因是可扩展性。当数据变大时,RDD 的执行速度会更快。 Spark 有延迟执行,只要调用 action 方法,作业就会执行。由于 write 是一个动作作业,因此你最终得到了错误。 Dataframe 比 RDD 快。

以上是关于将 PySpark 数据帧写入 Parquet 文件时出现 Py4JJavaError的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中写入镶木地板的问题

在 PySpark 中将数据帧写入 CSV 后重命名文件 [重复]

pyspark:数据帧写入镶木地板

pyspark write.parquet() 创建一个文件夹而不是 parquet 文件

如何在pyspark的循环中合并数据帧

从集群将整数/字符串写入 pyspark 中的文本文件