使用 Pyspark 直接在分区文件上运行查询

Posted

技术标签:

【中文标题】使用 Pyspark 直接在分区文件上运行查询【英文标题】:Running queries directly on partitioned files using Pyspark 【发布时间】:2018-11-03 00:32:33 【问题描述】:

Here 提到我们可以像这样直接在单个文件上运行查询。

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

我的问题是我们可以对一组分区为 yyyy/mm/dd 的文件执行此操作吗? 在同一页面中,它说 spark 可以自动发现分区。我不确定如何处理这种类型的查询。

【问题讨论】:

【参考方案1】:

如果您有一个嵌套partition fields镶木地板 yyyy/mm/dd 然后在查询中提供表位置目录,然后 spark 可以读取所有 嵌套分区并创建数据框

示例:

我有一个包含 3 个分区字符串字段(年、月、日)的表

hive> desc i;

+--------------------------+-----------------------+-----------------------+--+
|         col_name         |       data_type       |        comment        |
+--------------------------+-----------------------+-----------------------+--+
| id                       | int                   |                       |
| year                     | string                |                       |
| month                    | string                |                       |
| dd                       | string                |                       |
|                          | NULL                  | NULL                  |
| # Partition Information  | NULL                  | NULL                  |
| # col_name               | data_type             | comment               |
|                          | NULL                  | NULL                  |
| year                     | string                |                       |
| month                    | string                |                       |
| dd                       | string                |                       |
+--------------------------+-----------------------+-----------------------+--+

现在我正在尝试通过读取 parquet 文件来加载数据(表位置是 /apps/hive/warehouse/i)。

HDFS 中的文件:

hadoop fs -ls -R /apps/hive/warehouse/i
drwxrwxrwt   - hive hadoop          0 2018-11-03 00:10 /apps/hive/warehouse/i/year=2018
drwxrwxrwt   - hive hadoop          0 2018-11-03 00:10 /apps/hive/warehouse/i/year=2018/month=10
drwxrwxrwt   - hive hadoop          0 2018-11-03 00:10 /apps/hive/warehouse/i/year=2018/month=10/dd=15
-rwxrwxrwt   3 hive hadoop        214 2018-11-03 00:10 /apps/hive/warehouse/i/year=2018/month=10/dd=15/000000_0

现在从 pyspark 读取数据:

>>> df=spark.sql("select * from parquet.`/apps/hive/warehouse/i`")
>>> df.columns
['id', 'year', 'month', 'dd']
>>> df.show(10,False)
+---+----+-----+---+
|id |year|month|dd |
+---+----+-----+---+
|1  |2018|10   |15 |
+---+----+-----+---+

如果你只想读取特定年份的文件,那么

>>> df=spark.sql("select * from parquet.`/apps/hive/warehouse/i/year=2018`")
>>> df.columns
['id', 'month', 'dd']
>>> df.show(10,False)
+---+-----+---+
|id |month|dd |
+---+-----+---+
|1  |10   |15 |
+---+-----+---+

year 没有列,因为我们不使用 pyspark 读取年份数据,我们是来自 parquet 文件的 reading only the month,dd and id fields 数据。

更新:

如果是 csv 文件,我们可以关注 similar approach as above,我们不需要有 year,month,day 字段,因为 spark 会在读取目录中的数据时创建这些字段。

读取 CSV 文件:

#we are reading the csv files with header  
>>> spark.sql("""CREATE OR REPLACE TEMPORARY view df
             USING csv 
             OPTIONS (header "true", path "/apps/hive/warehouse/i")""")

>>> df=spark.sql("select * from df")
>>> df.show(10,False)
+---+-----+---+
|id |month|dd |
+---+-----+---+
|1  |10   |15 |
+---+-----+---+

【讨论】:

谢谢!但是,我对整个 pyspark 环境还是陌生的。首先,我正在处理 CSV 文件。它也适用于 CSV 文件吗?我们是否必须将年、月和日作为 csv 文件中的列来执行此操作?当我说分区时,这意味着 csv 文件只是根据创建的年月和日期分成文件夹。最后,在运行查询之前是否需要运行这个命令hadoop fs -ls -R /apps/hive/warehouse/i @user2939212,是的,我们也可以对 csv 文件使用类似的方法,请检查上面的更新答案。您不必运行hadoop fs -ls -R /apps/hive/warehouse/i 只是为了演示目的我已经运行该命令来显示目录结构:) 非常感谢,这里的信息对我有用。 ***.com/questions/46240271/…。您知道如何找到有关这些命令的文档吗?我找不到关于这些陈述的任何文档。 @user2939212,文档无处不在(official docs will have some basic ways to load the data高级用法我指的是堆栈溢出文档:)),请查看此官方文档数据块,docs.databricks.com/spark/latest/data-sources/…

以上是关于使用 Pyspark 直接在分区文件上运行查询的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 遍历每一行数据帧并运行配置单元查询

如何使用 jar 文件运行 pyspark?

pyspark - 分区数据的计算(使用“附加”模式创建)慢

使用 pyspark 对 parquet 文件进行分区和重新分区

PySpark 根据特定列重新分区

PySpark 中 JDBC 上的自定义分区