从分区拼花文件中读取 DataFrame

Posted

技术标签:

【中文标题】从分区拼花文件中读取 DataFrame【英文标题】:Reading DataFrame from partitioned parquet file 【发布时间】:2016-02-12 12:52:35 【问题描述】:

如何读取条件为数据框的分区拼花,

这很好用,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=25/*")

day=1 to day=30 的分区是否可以读取(day = 5 to 6)day=5,day=6 之类的内容,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=??/*")

如果我输入*,它会给我所有 30 天的数据,而且它太大了。

【问题讨论】:

【参考方案1】:

sqlContext.read.parquet 可以将多个路径作为输入。如果您只想要day=5day=6,您可以简单地添加两条路径,例如:

val dataframe = sqlContext
      .read.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
                    "file:///your/path/data=jDD/year=2015/month=10/day=6/")

如果您在day=X 下有文件夹,例如country=XX,则country 将自动添加为dataframe 中的一列。

编辑:从 Spark 1.6 开始,需要提供“基本路径”选项,以便 Spark 自动生成列。在 Spark 1.6.x 中,必须像这样重写上述内容以创建包含“数据”、“年”、“月”和“日”列的数据框:

val dataframe = sqlContext
     .read
     .option("basePath", "file:///your/path/")
     .parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
                    "file:///your/path/data=jDD/year=2015/month=10/day=6/")

【讨论】:

首先感谢您的回复,我正在寻找更简单的方法。如果以这种方式将大约 20 天作为子集将有点困难。我会经常过滤以检查数据的准确性。 那为什么不简单地将val dataframe = sqlContext.read.parquet("file:///your/path/data=jDD/year=2015/month=10/")? day`作为列添加到数据框中,然后您可以对其进行过滤。 实际上,它运行的数据非常庞大。数据是从 2007 年到 2015 年。平均处理和存储 50 亿行原始日志。我会被要求按需提供特定的数据报告 对,所以你要做的第一件事就是filter 操作。由于 Spark 会进行惰性评估,因此您应该对数据集的大小没有任何问题。过滤器将在任何操作之前应用,并且只有您感兴趣的数据会保存在内存中。 好吧,看来唯一的答案就是这个!【参考方案2】:

如果你想阅读多天,例如day = 5day = 6,并且想在路径本身中提及范围,可以使用通配符:

val dataframe = sqlContext
  .read
  .parquet("file:///your/path/data=jDD/year=2015/month=10/day=5,6/*")

通配符也可用于指定日期范围:

val dataframe = sqlContext
  .read
  .parquet("file:///your/path/data=jDD/year=2015/month=10/day=[5-10]/*")

这匹配从 5 到 10 的所有日期。

【讨论】:

这是 scala 专用的吗?我正在尝试使用 pyspark,它适用于 符号,但不适用于 []。我正在尝试在一个范围内阅读。 这是否适用于以相同的方式指定年份和月份的范围,例如“file:///your/path/data=mydata/year=[2015-2018]/month=[1- 6]/day=[5-10]/*") 奇怪的是pyspark中没有实现第二种方法。拥有它真的很方便。【参考方案3】:

您需要提供mergeSchema = true 选项。如下所述(这是从 1.6.0 开始的):

val dataframe = sqlContext.read.option("mergeSchema", "true").parquet("file:///your/path/data=jDD")

这会将所有 parquet 文件读入数据帧,并在数据帧数据中创建年、月和日列。

参考:https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#schema-merging

【讨论】:

仅当模式不同时才需要模式合并,如果它们相同则不需要。【参考方案4】:

就我的 pyspark 而言:

sdf_table = spark.read.parquet("s3://bucket/table/**/*.parquet")

** 是 parquet 的所有分区(一个 glob 表达式)

注意读取存储桶 "table/" 中的所有文件 parquet ,所以对其他文件保持警告

【讨论】:

以上是关于从分区拼花文件中读取 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

将 PubSub 流保存到 GCS 中的分区拼花文件

从 Impala 分区拼花表创建文本表

在 AzureML 上保存分区拼花

Hive 不读取 Spark 生成的分区 parquet 文件

编写拼花文件时如何避免空文件?

Spark缓慢重新分区许多小文件