将pyspark的数据框写入镶木地板时出错

Posted

技术标签:

【中文标题】将pyspark的数据框写入镶木地板时出错【英文标题】:Error when writing pyspark's dataframe into parquet 【发布时间】:2022-01-01 13:05:17 【问题描述】:

cleaned_mercury.write.mode('overwrite').parquet("../data/transformed-data/cleaned_mercury.parquet")

cleaned_mercury 是一个数据框,每当我尝试将数据转换为镶木地板时,它都会返回错误,我尝试到处寻找答案,但我找不到答案

~\AppData\Local\Temp/ipykernel_19676/2099139696.py in <module>
----> 1 cleaned_mercury.write.mode('overwrite').parquet("../data/transformed-data/cleaned_mercury.parquet")
      2 cleaned_watsons.write.mode('overwrite').parquet("../data/transformed-data/cleaned_watsons.parquet")
      3 cleaned_tgp.write.mode('overwrite').parquet("../data/transformed-data/cleaned_tgp.parquet")
      4 cleaned_ssd.write.mode('overwrite').parquet("../data/transformed-data/cleaned_ssd.parquet")
      5 cleaned_rose.write.mode('overwrite').parquet("../data/transformed-data/cleaned_rose.parquet")

C:\spark-3.2.0-bin-hadoop3.2\python\pyspark\sql\readwriter.py in parquet(self, path, mode, partitionBy, compression)
    883             self.partitionBy(partitionBy)
    884         self._set_opts(compression=compression)
--> 885         self._jwrite.parquet(path)
    886 
    887     def text(self, path, compression=None, lineSep=None):

C:\spark-3.2.0-bin-hadoop3.2\python\lib\py4j-0.10.9.2-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

C:\spark-3.2.0-bin-hadoop3.2\python\pyspark\sql\utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

C:\spark-3.2.0-bin-hadoop3.2\python\lib\py4j-0.10.9.2-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling 012.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1594.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:781)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
    at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
    at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$20(FileFormatWriter.scala:240)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:240)
    ... 42 more

【问题讨论】:

【参考方案1】:

原来我没有安装hadoop.dll,为什么我在***https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/hadoop.dll987654321@发布我的问题后自己找出答案。

【讨论】:

似乎 SO 不太受欢迎。但是现在是节日,许多新手可以加强那里的行为。那个sai,你的问题很难回答。 就我而言,我必须正确设置 HADOOP_HOME 目录,并且将 HADOOP_HOME 的“bin”子目录添加到 PATH。例如。来自提升的 PS 提示:[Environment]::SetEnvironmentVariable("PATH", "$env:PATH;$env:HADOOP_HOME\bin", 'Machine')

以上是关于将pyspark的数据框写入镶木地板时出错的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 数据框写入镶木地板而不删除 /_temporary 文件夹

无法将数据框保存到镶木地板 pyspark

在 PySpark 中写入镶木地板的问题

pyspark:数据帧写入镶木地板

udf 创建的时间序列无法写入镶木地板

将小 PySpark DataFrame 写入镶木地板时出现内存错误