加载 Parquet 文件时无法推断架构
Posted
技术标签:
【中文标题】加载 Parquet 文件时无法推断架构【英文标题】:Unable to infer schema when loading Parquet file 【发布时间】:2017-12-10 19:32:37 【问题描述】:response = "mi_or_chd_5"
outcome = sqlc.sql("""select eid,response as response
from outcomes
where response IS NOT NULL""".format(response=response))
outcome.write.parquet(response, mode="overwrite") # Success
print outcome.schema
StructType(List(StructField(eid,IntegerType,true),StructField(response,ShortType,true)))
然后:
outcome2 = sqlc.read.parquet(response) # fail
失败:
AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'
在
/usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc in deco(*a, **kw)
parquet 的文档说格式是自我描述的,保存 parquet 文件时可以使用完整的架构。什么给了?
使用 Spark 2.1.1。在 2.2.0 中也失败了。
找到this bug report,但已修复 2.0.1、2.1.0。
更新:当连接到 master="local" 时可以工作,连接到 master="mysparkcluster" 时会失败。
【问题讨论】:
【参考方案1】:当您尝试将空目录读取为 parquet 时,通常会发生此错误。 可能您的结果 Dataframe 是空的。
你可以在写之前用outcome.rdd.isEmpty()
检查DataFrame是否为空。
【讨论】:
数据框不为空。我相信问题的发生是因为文件名response
无法在集群上写入。在本地模式下工作正常。
那么也许你可以尝试更改用户名。在 Pyspark 中:os.environ["HADOOP_USER_NAME"] = "hdfs"
或在 Scala 中System.setProperty("HADOOP_USER_NAME","hdfs")
我不确定我们是否使用了 hadoop。是否是 Spark 的要求并且需要在安装 spark 集群时配置用户配置文件? (我们所有的数据都来自关系数据库并按需加载到 Spark 中)。在任何情况下,我都不需要在文件名前加上“hdfs://”。如果使用诸如“/my/nfs/network_directory/filename”之类的文件名,则保存有效。这也让我认为路径指的是工作人员本地文件系统。 (对不起——火花n00b)
抱歉,我假设您使用的是 Hadoop。您可以在 Local[]、Standalone(仅使用 Spark 的集群)或 YARN(使用 Hadoop 的集群)中运行 Spark。如果您使用的是 YARN 模式,默认情况下所有路径都假定您使用的是 HDFS,并且没有必要输入hdfs://
,实际上如果您想使用本地文件,您应该使用file://
如果例如您正在发送应用程序从您的计算机到集群,应用程序将使用您的用户名,并且可能无法访问 HDFS 文件。使用 HADOOP_USER_NAME 您可以更改它在 Spark Standalone 中我不知道文件和权限是如何工作的希望这有帮助!
使用 isEmpty() 方法从来都不是一个好习惯。如果可以,请避免 - 它“可以”将整个数据带入驱动程序内存 - 请参阅 Spark 中的 RDD 类代码。【参考方案2】:
就我而言,发生错误是因为我试图读取以下划线开头的镶木地板文件(例如_lots_of_data.parquet
)。不知道为什么这是一个问题,但删除前导下划线解决了这个问题。
另请参阅:
Re: Spark-2.0.0 fails reading a parquet dataset generated by Spark-1.6.2【讨论】:
Spark 将所有以_
开头的文件视为元数据而不是数据。
"Spark 2.0 忽略以下划线或点开头的路径名;_
或 .
" 正如 Spark 开发人员在此讨论的那样:issues.apache.org/jira/browse/…【参考方案3】:
我正在使用 AWS Glue,并且在从数据目录表(位置:s3 存储桶)读取数据时收到此错误。 经过一番分析,我意识到,这是由于文件位置(在我的情况下为 s3 存储桶路径)中的文件不可用。
Glue 试图在不存在的文件上应用数据目录表架构。
将文件复制到 s3 存储桶文件位置后,问题得到解决。
希望这对在 AWS Glue 中遇到/遇到错误的人有所帮助。
【讨论】:
同样使用 AWS Glue,如果作业书签过滤器导致没有数据并且您尝试写入,那么它会显示“在最终作业书签过滤器后,处理分区中 0 个文件的 0.00%”,然后导致“无法推断 Parquet 的模式。必须手动指定。”因为正在写入的帧是空的。【参考方案4】:当您尝试读取空表时会发生这种情况。如果表中正确插入了数据,应该没有问题。
除了镶木地板之外,ORC 也会发生同样的事情。
【讨论】:
【参考方案5】:只是为了在评论中强调@Davos 的答案,如果您的文件名在文件名的开头有一个点.
或下划线_
,您将遇到这个确切的异常错误
val df = spark.read.format("csv").option("delimiter", "|").option("header", "false")
.load("/Users/myuser/_HEADER_0")
org.apache.spark.sql.AnalysisException:
Unable to infer schema for CSV. It must be specified manually.;
解决方法是重命名文件并重试(例如_HEADER
重命名为HEADER
)
【讨论】:
谢谢。这不是我的错误。它认为错误是缺少文件系统。【参考方案6】:就我而言,发生错误是因为文件名包含下划线。重写/读取不带下划线的文件(连字符可以)解决了问题...
【讨论】:
【参考方案7】:我看到已经有很多答案了。但我面临的问题是我的 Spark 作业试图读取一个文件,该文件被之前启动的另一个 Spark 作业覆盖。听起来很糟糕,但我犯了那个错误。
【讨论】:
【参考方案8】:对我来说,这发生在我认为加载正确的文件路径但指向不正确的文件夹时
【讨论】:
【参考方案9】:我在读取 csv 时遇到了类似的问题
spark.read.csv("s3a://bucket/spark/csv_dir/.")
报错:
org.apache.spark.sql.AnalysisException: Unable to infer schema for CSV. It must be specified manually.;
我发现如果我删除了尾随的.
,然后它就可以工作了。即:
spark.read.csv("s3a://bucket/spark/csv_dir/")
我为 parquet
测试了这个,添加了一个尾随 .
,你得到一个错误:
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
【讨论】:
“Spark 2.0 忽略以下划线或点开头的路径(文件)名称;_
或 .
”正如 Spark 开发人员在此讨论的那样:issues.apache.org/jira/browse/…【参考方案10】:
我刚刚遇到了同样的问题,但这里的解决方案都不适合我。我尝试通过首先读取它们并将其写入另一个地方来合并 hdfs 上的镶木地板文件的行组:
df = spark.read.parquet('somewhere')
df.write.parquet('somewhere else')
但后来当我用
查询时spark.sql('SELECT sth FROM parquet.`hdfs://host:port/parquetfolder/` WHERE .. ')
它显示了同样的问题。 我终于通过使用 pyarrow 解决了这个问题:
df = spark.read.parquet('somewhere')
pdf = df.toPandas()
adf = pa.Table.from_pandas(pdf) # import pyarrow as pa
fs = pa.hdfs.connect()
fw = fs.open(path, 'wb')
pq.write_table(adf, fw) # import pyarrow.parquet as pq
fw.close()
【讨论】:
【参考方案11】:正如其他人提到的,在我的情况下,当我读取不存在的 S3 密钥时出现此错误。 一个解决方案是过滤确实存在的键:
import com.amazonaws.services.s3.AmazonS3URI
import org.apache.hadoop.fs.FileSystem, Path
import org.apache.spark.sql.SparkSession
import java.net.URI
def addEndpointToUrl(url: String, domain: String = "s3.amazonaws.com"): String =
val uri = new URI(url)
val hostWithEndpoint = uri.getHost + "." + domain
new URI(uri.getScheme, uri.getUserInfo, hostWithEndpoint, uri.getPort, uri.getPath, uri.getQuery, uri.getFragment).toString
def createS3URI(url: String): AmazonS3URI =
try
// try to instantiate AmazonS3URI with url
new AmazonS3URI(url)
catch
case e: IllegalArgumentException if e.getMessage.
startsWith("Invalid S3 URI: hostname does not appear to be a valid S3 endpoint") =>
new AmazonS3URI(addEndpointToUrl(url))
def s3FileExists(spark: SparkSession, url: String): Boolean =
val amazonS3Uri: AmazonS3URI = createS3URI(url)
val s3BucketUri = new URI(s"$amazonS3Uri.getURI().getScheme://$amazonS3Uri.getBucket")
FileSystem
.get(s3BucketUri, spark.sparkContext.hadoopConfiguration)
.exists(new Path(url))
您可以将其用作:
val partitions = List(yesterday, today, tomorrow)
.map(f => somepath + "/date=" + f)
.filter(f => s3FileExists(spark, f))
val df = spark.read.parquet(partitions: _*)
对于那个解决方案,我从spark-redshift
项目here 中提取了一些代码。
【讨论】:
【参考方案12】:两个 parquet 文件都不存在。 (99.99% 的情况是这个问题。Spark 错误消息通常不太明显) parquet 文件不知何故损坏了,或者根本不是 parquet 文件你只是在加载 parquet 文件,parquet 当然是有效的 架构。否则它不会被保存为镶木地板。这个错误意味着 -
【讨论】:
是的。回想起来,这对于知道如何解释 Spark 异常消息的人来说可能是显而易见的。【参考方案13】:由于文件夹中的文件夹问题,我遇到了这个问题。
例如 folderA.parquet 应该有partion....但是它有folderB.parquet,里面有partition。
分辨率, 将文件传输到父文件夹并删除子文件夹。
【讨论】:
【参考方案14】:你可以用 /* 阅读
outcome2 = sqlc.read.parquet(f"response/*") # work for me
【讨论】:
【参考方案15】:我遇到了一个 正在编写的镶木地板文件。 只需要等待它完全写入。
【讨论】:
以上是关于加载 Parquet 文件时无法推断架构的主要内容,如果未能解决你的问题,请参考以下文章
用户类抛出异常:org.apache.spark.sql.AnalysisException:无法推断 Parquet 的架构。必须手动指定
将 Parquet 数据加载到 PIG 时如何避免 UnsatisfiedLinkError