在按天分区的数据上过滤 n 天窗口的 spark DataFrame

Posted

技术标签:

【中文标题】在按天分区的数据上过滤 n 天窗口的 spark DataFrame【英文标题】:Filtering a spark DataFrame for an n-day window on data partitioned by day 【发布时间】:2018-01-29 20:58:22 【问题描述】:

假设我有一个名为 transactions 的 DataFrame,其中包含以下整数列:yearmonthdaytimestamptransaction_id

In [1]: transactions = ctx.createDataFrame([(2017, 12, 1, 10000, 1), (2017, 12, 2, 10001, 2), (2017, 12, 3, 10003, 3), (2017, 12, 4, 10004, 4), (2017, 12, 5, 10005, 5), (2017, 12, 6, 10006, 6)],('year', 'month', 'day', 'timestamp', 'transaction_id'))

In [2]: transactions.show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10000|             1|
|2017|   12|  2|    10001|             2|
|2017|   12|  3|    10003|             3|
|2017|   12|  4|    10004|             4|
|2017|   12|  5|    10005|             5|
|2017|   12|  6|    10006|             6|
+----+-----+---+---------+--------------+

我想定义一个函数filter_date_range,它返回一个DataFrame,该DataFrame由某个日期范围内的事务行组成。

>>> filter_date_range(  
        df = transactions, 
        start_date = datetime.date(2017, 12, 2), 
        end_date = datetime.date(2017, 12, 4)).show()

+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10001|             2|
|2017|   12|  1|    10003|             3|
|2017|   12|  1|    10004|             4|
+----+-----+---+---------+--------------+

假设数据保存在 Hive 分区中,由yearmonthday 分区,那么执行涉及日期算术的此类过滤器的最有效方法是什么?我正在寻找一种以纯 DataFrame-ic 方式执行此操作的方法,而不使用 transactions.rdd,以便 Spark 可以推断出实际上只需要读取分区的子集。

【问题讨论】:

一些示例数据会很有用。让我们可以轻松地剪切和粘贴来重新创建您的数据框。 How to make good reproducible Apache Spark Dataframe examples. @pault 使用示例数据更新 【参考方案1】:

如果数据是这样分区的:

.
├── _SUCCESS
└── year=2017
    └── month=12
        ├── day=1
        │   └── part-0...parquet
        ├── day=2
        │   └── part-0...parquet
        ├── day=3
        │   └── part-0...parquet
        ├── day=4
        │   └── part-0...parquet
        ├── day=5
        │   └── part-0...parquet
        └── day=6
            └── part-0...parquet

您可以只生成要加载的目录列表:

start_date = datetime.date(2017, 12, 2)
end_date = datetime.date(2017, 12, 4)
n = (end_date - start_date).days + 1

base_path = ...

paths = [
    "/year=/month=/day=".format(base_path, d.year, d.month, d.day) 
    for d in [start_date + datetime.timedelta(days=i) for i in  range(n)]
]

spark.read.option("basePath", base_path).load(paths).explain()

# == Parsed Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Analyzed Logical Plan ==
# timestamp: bigint, transaction_id: bigint, year: int, month: int, day: int
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Optimized Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Physical Plan ==
# *FileScan parquet [timestamp#47L,transaction_id#48L,year#49,month#50,day#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/transactions/year=2017/month=12/day=2, file:/user/hiv..., PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:bigint,transaction_id:bigint>

参考:

Iterating through a range of dates in Python

【讨论】:

以上是关于在按天分区的数据上过滤 n 天窗口的 spark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

按天过滤熊猫数据框

MySQL创建分区表,并按天自动分区

MySQL创建分区表,并按天自动分区

按天分区并通过存储过程删除历史分区

Google Bigquery:Spark - 不兼容的表分区规范

BigQuery 中的分区如何工作?