在 Azure Blob 存储中编写 Parquet:“其中一个请求输入无效”

Posted

技术标签:

【中文标题】在 Azure Blob 存储中编写 Parquet:“其中一个请求输入无效”【英文标题】:Writing Parquet in Azure Blob Storage: "One of the request inputs is not valid" 【发布时间】:2021-11-15 02:35:13 【问题描述】:

我正在尝试将 parquet 格式的简单 DataFrame 写入 Azure Blob 存储。 请注意,以下代码 sn-ps 在本地工作,所以我的猜测是它必须与 Azure 库相关。我也尝试了delta 格式并且它可以工作(即使它在引擎盖下使用parquet)。

使用 Spark 3.1.1、Scala 2.12.10、OpenJDK 1.8.0_292。

我像往常一样设置我的 Spark 会话,例如:

$SPARK_HOME/bin/spark-shell \
  (...cluster settings...) \
  --conf spark.hadoop.fs.azure.account.key.<account>.blob.core.windows.net="$AZURE_BLOB_STORAGE_KEY" \
  --conf spark.hadoop.fs.AbstractFileSystem.wasb.impl=org.apache.hadoop.fs.azure.Wasb \
  --conf spark.hadoop.fs.wasb.impl=org.apache.hadoop.fs.azure.NativeAzureFileSystem \
  --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.AzureLogStore \
  --packages org.apache.hadoop:hadoop-azure:2.7.0,com.azure:azure-storage-blob:12.8.0,com.azure:azure-storage-common:12.8.0,com.microsoft.azure:azure-storage:2.0.0,io.delta:delta-core_2.12:0.8.0
  (...other irrelevant settings...)

我尝试了azure-storage-blobazure-storage-commonazure-storage包的其他版本,都导致了同样的问题。

为了重现问题,我创建了一个简单的数据框并将其写入存储:

val columns = Seq("language", "users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd).toDF(columns: _*)
df.show
// +--------+-----------+                                                                                                                                  
// |language|users_count|
// +--------+-----------+
// |    Java|      20000|
// |  Python|     100000|
// |   Scala|       3000|
// +--------+-----------+

df.write.parquet("wasb://<container>@<account>.blob.core.windows.net/<path>")

以镶木地板格式书写时,出现com.microsoft.azure.storage.StorageException: One of the request inputs is not valid 异常:

21/09/21 13:38:14 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 83) (10.244.6.3 executor 6): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2482)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:424)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:1997)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:531)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:260)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
        ... 9 more
Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:162)
        at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:307)
        at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:177)
        at com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(CloudBlob.java:764)
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        ... 20 more

关于导致它的原因或如何使其工作的任何提示或想法? 谢谢!

【问题讨论】:

【参考方案1】:

有两件事帮助我使用 WASB 协议写入 Azure 存储:

    存储容器应该是 Data Lake Gen1(尝试过 Gen2 并失败)

    您需要添加以下依赖项/jar:

    org.codehaus.jackson 杰克逊-映射器-lgpl 1.9.13

我必须使用 WASB(由于我的 hadoop (2.9.2) 版本不支持 ABFS),但如果您有 hadoop-2.10.1+,请使用 ABFS。

【讨论】:

所以您认为这是存储问题,而不是库问题?我只尝试使用 BlobStorage,同时使用 Hadoop 2.7 和 3.2 (WASB)。我会看看你的建议。

以上是关于在 Azure Blob 存储中编写 Parquet:“其中一个请求输入无效”的主要内容,如果未能解决你的问题,请参考以下文章

从 Azure Databricks 将数据写入 Azure Blob 存储

如何检查图像是不是在 Azure Blob 存储上发布?

Azure Blob 存储:从 Excel 工作簿中删除密码

使用 PowerShell 脚本从 Azure Blob 存储读取 JSON 文件并写回 Blob 存储中的另一个文件

使用 C# 从存储在 azure blob 存储中的 200gb 文本文件中读取一行

从 Azure Blob 存储下载文件