加载 Parquet 文件时无法推断架构

Posted

技术标签:

【中文标题】加载 Parquet 文件时无法推断架构【英文标题】:Unable to infer schema when loading Parquet file 【发布时间】:2017-12-10 19:32:37 【问题描述】:
response = "mi_or_chd_5"

outcome = sqlc.sql("""select eid,response as response
from outcomes
where response IS NOT NULL""".format(response=response))
outcome.write.parquet(response, mode="overwrite") # Success
print outcome.schema
StructType(List(StructField(eid,IntegerType,true),StructField(response,ShortType,true)))

然后:

outcome2 = sqlc.read.parquet(response)  # fail

失败:

AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'

/usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc in deco(*a, **kw)

parquet 的文档说格式是自我描述的,保存 parquet 文件时可以使用完整的架构。什么给了?

使用 Spark 2.1.1。在 2.2.0 中也失败了。

找到this bug report,但已修复 2.0.1、2.1.0。

更新:当连接到 master="local" 时可以工作,连接到 master="mysparkcluster" 时会失败。

【问题讨论】:

【参考方案1】:

当您尝试将空目录读取为 parquet 时,通常会发生此错误。 可能您的结果 Dataframe 是空的。

你可以在写之前用outcome.rdd.isEmpty()检查DataFrame是否为空。

【讨论】:

数据框不为空。我相信问题的发生是因为文件名response 无法在集群上写入。在本地模式下工作正常。 那么也许你可以尝试更改用户名。在 Pyspark 中:os.environ["HADOOP_USER_NAME"] = "hdfs" 或在 Scala 中System.setProperty("HADOOP_USER_NAME","hdfs") 我不确定我们是否使用了 hadoop。是否是 Spark 的要求并且需要在安装 spark 集群时配置用户配置文件? (我们所有的数据都来自关系数据库并按需加载到 Spark 中)。在任何情况下,我都不需要在文件名前加上“hdfs://”。如果使用诸如“/my/nfs/network_directory/filename”之类的文件名,则保存有效。这也让我认为路径指的是工作人员本地文件系统。 (对不起——火花n00b) 抱歉,我假设您使用的是 Hadoop。您可以在 Local[]、Standalone(仅使用 Spark 的集群)或 YARN(使用 Hadoop 的集群)中运行 Spark。如果您使用的是 YARN 模式,默认情况下所有路径都假定您使用的是 HDFS,并且没有必要输入hdfs://,实际上如果您想使用本地文件,您应该使用file://如果例如您正在发送应用程序从您的计算机到集群,应用程序将使用您的用户名,并且可能无法访问 HDFS 文件。使用 HADOOP_USER_NAME 您可以更改它在 Spark Standalone 中我不知道文件和权限是如何工作的希望这有帮助! 使用 isEmpty() 方法从来都不是一个好习惯。如果可以,请避免 - 它“可以”将整个数据带入驱动程序内存 - 请参阅 Spark 中的 RDD 类代码。【参考方案2】:

就我而言,发生错误是因为我试图读取以下划线开头的镶木地板文件(例如_lots_of_data.parquet)。不知道为什么这是一个问题,但删除前导下划线解决了这个问题。

另请参阅:

Re: Spark-2.0.0 fails reading a parquet dataset generated by Spark-1.6.2

【讨论】:

Spark 将所有以_ 开头的文件视为元数据而不是数据。 "Spark 2.0 忽略以下划线或点开头的路径名;_." 正如 Spark 开发人员在此讨论的那样:issues.apache.org/jira/browse/…【参考方案3】:

我正在使用 AWS Glue,并且在从数据目录表(位置:s3 存储桶)读取数据时收到此错误。 经过一番分析,我意识到,这是由于文件位置(在我的情况下为 s3 存储桶路径)中的文件不可用。

Glue 试图在不存在的文件上应用数据目录表架构。

将文件复制到 s3 存储桶文件位置后,问题得到解决。

希望这对在 AWS Glue 中遇到/遇到错误的人有所帮助。

【讨论】:

同样使用 AWS Glue,如果作业书签过滤器导致没有数据并且您尝试写入,那么它会显示“在最终作业书签过滤器后,处理分区中 0 个文件的 0.00%”,然后导致“无法推断 Parquet 的模式。必须手动指定。”因为正在写入的帧是空的。【参考方案4】:

当您尝试读取空表时会发生这种情况。如果表中正确插入了数据,应该没有问题。

除了镶木地板之外,ORC 也会发生同样的事情。

【讨论】:

【参考方案5】:

只是为了在评论中强调@Davos 的答案,如果您的文件名在文件名的开头有一个点. 或下划线_,您将遇到这个确切的异常错误

val df = spark.read.format("csv").option("delimiter", "|").option("header", "false")
         .load("/Users/myuser/_HEADER_0")

org.apache.spark.sql.AnalysisException: 
Unable to infer schema for CSV. It must be specified manually.;

解决方法是重命名文件并重试(例如_HEADER重命名为HEADER

【讨论】:

谢谢。这不是我的错误。它认为错误是缺少文件系统。【参考方案6】:

就我而言,发生错误是因为文件名包含下划线。重写/读取不带下划线的文件(连字符可以)解决了问题...

【讨论】:

【参考方案7】:

我看到已经有很多答案了。但我面临的问题是我的 Spark 作业试图读取一个文件,该文件被之前启动的另一个 Spark 作业覆盖。听起来很糟糕,但我犯了那个错误。

【讨论】:

【参考方案8】:

对我来说,这发生在我认为加载正确的文件路径但指向不正确的文件夹时

【讨论】:

【参考方案9】:

我在读取 csv 时遇到了类似的问题

spark.read.csv("s3a://bucket/spark/csv_dir/.")

报错:

org.apache.spark.sql.AnalysisException: Unable to infer schema for CSV. It must be specified manually.;

我发现如果我删除了尾随的.,然后它就可以工作了。即:

spark.read.csv("s3a://bucket/spark/csv_dir/")

我为 parquet 测试了这个,添加了一个尾随 .,你得到一个错误:

org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;

【讨论】:

“Spark 2.0 忽略以下划线或点开头的路径(文件)名称;_.”正如 Spark 开发人员在此讨论的那样:issues.apache.org/jira/browse/…【参考方案10】:

我刚刚遇到了同样的问题,但这里的解决方案都不适合我。我尝试通过首先读取它们并将其写入另一个地方来合并 hdfs 上的镶木地板文件的行组:

df = spark.read.parquet('somewhere')
df.write.parquet('somewhere else')

但后来当我用

查询时
spark.sql('SELECT sth FROM parquet.`hdfs://host:port/parquetfolder/` WHERE .. ')

它显示了同样的问题。 我终于通过使用 pyarrow 解决了这个问题:

df = spark.read.parquet('somewhere')
pdf = df.toPandas()
adf = pa.Table.from_pandas(pdf)   # import pyarrow as pa
fs = pa.hdfs.connect()
fw = fs.open(path, 'wb')
pq.write_table(adf, fw)           # import pyarrow.parquet as pq
fw.close()

【讨论】:

【参考方案11】:

正如其他人提到的,在我的情况下,当我读取不存在的 S3 密钥时出现此错误。 一个解决方案是过滤确实存在的键:

import com.amazonaws.services.s3.AmazonS3URI
import org.apache.hadoop.fs.FileSystem, Path
import org.apache.spark.sql.SparkSession
import java.net.URI

def addEndpointToUrl(url: String, domain: String = "s3.amazonaws.com"): String = 
  val uri = new URI(url)
  val hostWithEndpoint = uri.getHost + "." + domain
  new URI(uri.getScheme, uri.getUserInfo, hostWithEndpoint, uri.getPort, uri.getPath, uri.getQuery, uri.getFragment).toString


def createS3URI(url: String): AmazonS3URI = 
  try 
    // try to instantiate AmazonS3URI with url
    new AmazonS3URI(url)
   catch 
    case e: IllegalArgumentException if e.getMessage.
      startsWith("Invalid S3 URI: hostname does not appear to be a valid S3 endpoint") => 
      new AmazonS3URI(addEndpointToUrl(url))
    
  


def s3FileExists(spark: SparkSession, url: String): Boolean = 
  val amazonS3Uri: AmazonS3URI = createS3URI(url)
  val s3BucketUri = new URI(s"$amazonS3Uri.getURI().getScheme://$amazonS3Uri.getBucket")

  FileSystem
    .get(s3BucketUri, spark.sparkContext.hadoopConfiguration)
    .exists(new Path(url))

您可以将其用作:

val partitions = List(yesterday, today, tomorrow)
  .map(f => somepath + "/date=" + f)
  .filter(f => s3FileExists(spark, f))

val df = spark.read.parquet(partitions: _*)

对于那个解决方案,我从spark-redshift 项目here 中提取了一些代码。

【讨论】:

【参考方案12】:

你只是在加载 parquet 文件,parquet 当然是有效的 架构。否则它不会被保存为镶木地板。这个错误意味着 -

两个 parquet 文件都不存在。 (99.99% 的情况是这个问题。Spark 错误消息通常不太明显) parquet 文件不知何故损坏了,或者根本不是 parquet 文件

【讨论】:

是的。回想起来,这对于知道如何解释 Spark 异常消息的人来说可能是显而易见的。【参考方案13】:

由于文件夹中的文件夹问题,我遇到了这个问题。

例如 folderA.parquet 应该有partion....但是它有folderB.parquet,里面有partition。

分辨率, 将文件传输到父文件夹并删除子文件夹。

【讨论】:

【参考方案14】:

你可以用 /* 阅读

outcome2 = sqlc.read.parquet(f"response/*")  # work for me

【讨论】:

【参考方案15】:

我遇到了一个 正在编写的镶木地板文件。 只需要等待它完全写入。

【讨论】:

以上是关于加载 Parquet 文件时无法推断架构的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark--调用 o50.parque 时出错

用户类抛出异常:org.apache.spark.sql.AnalysisException:无法推断 Parquet 的架构。必须手动指定

如何使作业幂等于多次运行在S3中生成相同的结果文件

将 Parquet 数据加载到 PIG 时如何避免 UnsatisfiedLinkError

Pyarrow.lib.Schema 与 pyarrow.parquet.Schema

使用 Python 编写 Parquet 文件的方法?