在 Azure Blob 存储中编写 Parquet:“其中一个请求输入无效”
Posted
技术标签:
【中文标题】在 Azure Blob 存储中编写 Parquet:“其中一个请求输入无效”【英文标题】:Writing Parquet in Azure Blob Storage: "One of the request inputs is not valid" 【发布时间】:2021-11-15 02:35:13 【问题描述】:我正在尝试将 parquet
格式的简单 DataFrame 写入 Azure Blob 存储。
请注意,以下代码 sn-ps 在本地工作,所以我的猜测是它必须与 Azure 库相关。我也尝试了delta
格式并且它可以工作(即使它在引擎盖下使用parquet
)。
使用 Spark 3.1.1、Scala 2.12.10、OpenJDK 1.8.0_292。
我像往常一样设置我的 Spark 会话,例如:
$SPARK_HOME/bin/spark-shell \
(...cluster settings...) \
--conf spark.hadoop.fs.azure.account.key.<account>.blob.core.windows.net="$AZURE_BLOB_STORAGE_KEY" \
--conf spark.hadoop.fs.AbstractFileSystem.wasb.impl=org.apache.hadoop.fs.azure.Wasb \
--conf spark.hadoop.fs.wasb.impl=org.apache.hadoop.fs.azure.NativeAzureFileSystem \
--conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.AzureLogStore \
--packages org.apache.hadoop:hadoop-azure:2.7.0,com.azure:azure-storage-blob:12.8.0,com.azure:azure-storage-common:12.8.0,com.microsoft.azure:azure-storage:2.0.0,io.delta:delta-core_2.12:0.8.0
(...other irrelevant settings...)
我尝试了azure-storage-blob
、azure-storage-common
和azure-storage
包的其他版本,都导致了同样的问题。
为了重现问题,我创建了一个简单的数据框并将其写入存储:
val columns = Seq("language", "users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd).toDF(columns: _*)
df.show
// +--------+-----------+
// |language|users_count|
// +--------+-----------+
// | Java| 20000|
// | Python| 100000|
// | Scala| 3000|
// +--------+-----------+
df.write.parquet("wasb://<container>@<account>.blob.core.windows.net/<path>")
以镶木地板格式书写时,出现com.microsoft.azure.storage.StorageException: One of the request inputs is not valid
异常:
21/09/21 13:38:14 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 83) (10.244.6.3 executor 6): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2482)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:424)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:1997)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:531)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
... 9 more
Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:162)
at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:307)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:177)
at com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(CloudBlob.java:764)
at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
... 20 more
关于导致它的原因或如何使其工作的任何提示或想法? 谢谢!
【问题讨论】:
【参考方案1】:有两件事帮助我使用 WASB 协议写入 Azure 存储:
存储容器应该是 Data Lake Gen1(尝试过 Gen2 并失败)
您需要添加以下依赖项/jar:
org.codehaus.jackson 杰克逊-映射器-lgpl 1.9.13我必须使用 WASB(由于我的 hadoop (2.9.2) 版本不支持 ABFS),但如果您有 hadoop-2.10.1+,请使用 ABFS。
【讨论】:
所以您认为这是存储问题,而不是库问题?我只尝试使用 BlobStorage,同时使用 Hadoop 2.7 和 3.2 (WASB)。我会看看你的建议。以上是关于在 Azure Blob 存储中编写 Parquet:“其中一个请求输入无效”的主要内容,如果未能解决你的问题,请参考以下文章
从 Azure Databricks 将数据写入 Azure Blob 存储
Azure Blob 存储:从 Excel 工作簿中删除密码
使用 PowerShell 脚本从 Azure Blob 存储读取 JSON 文件并写回 Blob 存储中的另一个文件