Pyspark 写入数据帧并将其保存到 csv 文件中会出现错误
Posted
技术标签:
【中文标题】Pyspark 写入数据帧并将其保存到 csv 文件中会出现错误【英文标题】:Pyspark write and save a dataframe into csv file gives an error 【发布时间】:2021-01-18 02:25:17 【问题描述】:我以本地模式启动了 pyspark,目的是学习。在我尝试使用以下代码将数据帧写入 csv 文件并将其保存到 csv 文件之前,一切进展顺利:
out_path = "data/sparkify_log_small.csv"
user_log.write.save(out_path, format="csv", header=True)
它给了我这个错误,同时用 csv 文件的名称创建了一个空文件夹,而没有写入文件本身,我一直在四处寻找解决方案,但没有任何线索:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-18-960f808bce3b> in <module>
----> 1 user_log.write.save(out_path, format="csv", header=True)
C:\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\readwriter.py in save(self, path, format, mode, partitionBy, **options)
737 self._jwrite.save()
738 else:
--> 739 self._jwrite.save(path)
740
741 @since(1.4)
C:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
C:\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
C:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o195.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 11, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
... 32 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
我发现这个问题之前没有答案,希望找到可以帮助我解决这个问题的人。
【问题讨论】:
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 F:\BOOKS\Data Analytics\Data Engineering Nanodegree\4. Data Lakes with Spark\Lesson 2 - Data Wrangling with Spark\Data and Exercises\data\sparkify_log_small.csv\_temporary\0\_temporary\attempt_20201002001955_0007_m_000000_11\part-00000-33b03dc3-af9a-4343-81cb-d97c53f31fd2-c000.csv
【参考方案1】:
在 spark 中你不能命名文件,因此如果你想要一个单独的 csv 文件,你可以这样做,然后如果你愿意,可以重命名它。
out_path = "data/"
user_log.repartition(1).write.option("header", "true").csv(out_path, mode = 'append')
【讨论】:
不幸的是,它也引发了几乎相同的错误Py4JJavaError: An error occurred while calling o174.csv. : org.apache.spark.SparkException: Job aborted.
检查this 答案。【参考方案2】:
正确的解决方案需要结合本网站上给出的答案。
您需要来自站点的 hadoop.dll,其中 winutils 已下载。
save 函数接受一个路径,而不是一个 .csv 文件,正如许多人所想到的那样。
真正的故事是,它要写一个hdfs 文件作为输出,并且需要一个路径。
如果我们深入研究代码,我们可以推断出 将方法签名保存为: df.write.save(dir_path, "csv", "append")
其中 dir_path 是任何 Windows 路径。
当然,还有更多选择,但是 这适用于 spark-3.1.2-bin-hadoop3.2, 在旧的 Windows 7 笔记本电脑上运行。
生成的输出文件如下所示: 部分-00000-4e8efdcb-60e3-483a-b165- 9d1a67394a0c-c000.csv
【讨论】:
【参考方案3】:我遇到了同样的错误。但是,我找到了一个主题并解决了我的问题。就我而言,我在站点中下载了正确版本的 winutils:https://github.com/cdarlint/winutils 在文件夹 bin 中我下载了 hadoop.dll 并放入了 winutils.exe 的相同路径 例如“C:\Spark\spark- 3.2.1-bin-hadoop3.2\bin"
【讨论】:
以上是关于Pyspark 写入数据帧并将其保存到 csv 文件中会出现错误的主要内容,如果未能解决你的问题,请参考以下文章
PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?
Python pyspark 将 DF 写入 .csv 并存储在本地 C 盘