Pyspark 写入数据帧并将其保存到 csv 文件中会出现错误

Posted

技术标签:

【中文标题】Pyspark 写入数据帧并将其保存到 csv 文件中会出现错误【英文标题】:Pyspark write and save a dataframe into csv file gives an error 【发布时间】:2021-01-18 02:25:17 【问题描述】:

我以本地模式启动了 pyspark,目的是学习。在我尝试使用以下代码将数据帧写入 csv 文件并将其保存到 csv 文件之前,一切进展顺利:

out_path = "data/sparkify_log_small.csv"
user_log.write.save(out_path, format="csv", header=True)

它给了我这个错误,同时用 csv 文件的名称创建了一个空文件夹,而没有写入文件本身,我一直在四处寻找解决方案,但没有任何线索:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-960f808bce3b> in <module>
----> 1 user_log.write.save(out_path, format="csv", header=True)

C:\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\readwriter.py in save(self, path, format, mode, partitionBy, **options)
    737             self._jwrite.save()
    738         else:
--> 739             self._jwrite.save(path)
    740 
    741     @since(1.4)

C:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

C:\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling 012.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o195.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 11, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 32 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    ... 1 more

我发现这个问题之前没有答案,希望找到可以帮助我解决这个问题的人。

【问题讨论】:

Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv 【参考方案1】:

在 spark 中你不能命名文件,因此如果你想要一个单独的 csv 文件,你可以这样做,然后如果你愿意,可以重命名它。

out_path = "data/"
user_log.repartition(1).write.option("header", "true").csv(out_path, mode = 'append')

【讨论】:

不幸的是,它也引发了几乎相同的错误Py4JJavaError: An error occurred while calling o174.csv. : org.apache.spark.SparkException: Job aborted. 检查this 答案。【参考方案2】:

正确的解决方案需要结合本网站上给出的答案。

    您需要来自站点的 hadoop.dll,其中 winutils 已下载。

    save 函数接受一个路径,而不是一个 .csv 文件,正如许多人所想到的那样。

    真正的故事是,它要写一个hdfs 文件作为输出,并且需要一个路径。

    如果我们深入研究代码,我们可以推断出 将方法签名保存为: df.write.save(dir_path, "csv", "append")

    其中 dir_path 是任何 Windows 路径。

    当然,还有更多选择,但是 这适用于 spark-3.1.2-bin-hadoop3.2, 在旧的 Windows 7 笔记本电脑上运行。

    生成的输出文件如下所示: 部分-00000-4e8efdcb-60e3-483a-b165- 9d1a67394a0c-c000.csv

【讨论】:

【参考方案3】:

我遇到了同样的错误。但是,我找到了一个主题并解决了我的问题。就我而言,我在站点中下载了正确版本的 winutils:https://github.com/cdarlint/winutils 在文件夹 bin 中我下载了 hadoop.dll 并放入了 winutils.exe 的相同路径 例如“C:\Spark\spark- 3.2.1-bin-hadoop3.2\bin"

【讨论】:

以上是关于Pyspark 写入数据帧并将其保存到 csv 文件中会出现错误的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:将df写入具有特定名称的文件,绘制df

PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?

Python pyspark 将 DF 写入 .csv 并存储在本地 C 盘

使用 PySpark 写入数据帧时出错

与 groupBy 聚合后将 pyspark 数据帧保存为 csv 文件

在 PySpark 中将数据帧写入 CSV 后重命名文件 [重复]