从分区拼花文件中读取 DataFrame
Posted
技术标签:
【中文标题】从分区拼花文件中读取 DataFrame【英文标题】:Reading DataFrame from partitioned parquet file 【发布时间】:2016-02-12 12:52:35 【问题描述】:如何读取条件为数据框的分区拼花,
这很好用,
val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=25/*")
有day=1 to day=30
的分区是否可以读取(day = 5 to 6)
或day=5,day=6
之类的内容,
val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=??/*")
如果我输入*
,它会给我所有 30 天的数据,而且它太大了。
【问题讨论】:
【参考方案1】:sqlContext.read.parquet
可以将多个路径作为输入。如果您只想要day=5
和day=6
,您可以简单地添加两条路径,例如:
val dataframe = sqlContext
.read.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/",
"file:///your/path/data=jDD/year=2015/month=10/day=6/")
如果您在day=X
下有文件夹,例如country=XX
,则country
将自动添加为dataframe
中的一列。
编辑:从 Spark 1.6 开始,需要提供“基本路径”选项,以便 Spark 自动生成列。在 Spark 1.6.x 中,必须像这样重写上述内容以创建包含“数据”、“年”、“月”和“日”列的数据框:
val dataframe = sqlContext
.read
.option("basePath", "file:///your/path/")
.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/",
"file:///your/path/data=jDD/year=2015/month=10/day=6/")
【讨论】:
首先感谢您的回复,我正在寻找更简单的方法。如果以这种方式将大约 20 天作为子集将有点困难。我会经常过滤以检查数据的准确性。 那为什么不简单地将val dataframe = sqlContext.read.parquet("file:///your/path/data=jDD/year=2015/month=10/")?
day`作为列添加到数据框中,然后您可以对其进行过滤。
实际上,它运行的数据非常庞大。数据是从 2007 年到 2015 年。平均处理和存储 50 亿行原始日志。我会被要求按需提供特定的数据报告
对,所以你要做的第一件事就是filter
操作。由于 Spark 会进行惰性评估,因此您应该对数据集的大小没有任何问题。过滤器将在任何操作之前应用,并且只有您感兴趣的数据会保存在内存中。
好吧,看来唯一的答案就是这个!【参考方案2】:
如果你想阅读多天,例如day = 5
和day = 6
,并且想在路径本身中提及范围,可以使用通配符:
val dataframe = sqlContext
.read
.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5,6/*")
通配符也可用于指定日期范围:
val dataframe = sqlContext
.read
.parquet("file:///your/path/data=jDD/year=2015/month=10/day=[5-10]/*")
这匹配从 5 到 10 的所有日期。
【讨论】:
这是 scala 专用的吗?我正在尝试使用 pyspark,它适用于
符号,但不适用于 []
。我正在尝试在一个范围内阅读。
这是否适用于以相同的方式指定年份和月份的范围,例如“file:///your/path/data=mydata/year=[2015-2018]/month=[1- 6]/day=[5-10]/*")
奇怪的是pyspark中没有实现第二种方法。拥有它真的很方便。【参考方案3】:
您需要提供mergeSchema = true
选项。如下所述(这是从 1.6.0 开始的):
val dataframe = sqlContext.read.option("mergeSchema", "true").parquet("file:///your/path/data=jDD")
这会将所有 parquet 文件读入数据帧,并在数据帧数据中创建年、月和日列。
参考:https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#schema-merging
【讨论】:
仅当模式不同时才需要模式合并,如果它们相同则不需要。【参考方案4】:就我的 pyspark 而言:
sdf_table = spark.read.parquet("s3://bucket/table/**/*.parquet")
** 是 parquet 的所有分区(一个 glob 表达式)
注意读取存储桶 "table/" 中的所有文件 parquet ,所以对其他文件保持警告
【讨论】:
以上是关于从分区拼花文件中读取 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章