在按天分区的数据上过滤 n 天窗口的 spark DataFrame
Posted
技术标签:
【中文标题】在按天分区的数据上过滤 n 天窗口的 spark DataFrame【英文标题】:Filtering a spark DataFrame for an n-day window on data partitioned by day 【发布时间】:2018-01-29 20:58:22 【问题描述】:假设我有一个名为 transactions
的 DataFrame,其中包含以下整数列:year
、month
、day
、timestamp
、transaction_id
。
In [1]: transactions = ctx.createDataFrame([(2017, 12, 1, 10000, 1), (2017, 12, 2, 10001, 2), (2017, 12, 3, 10003, 3), (2017, 12, 4, 10004, 4), (2017, 12, 5, 10005, 5), (2017, 12, 6, 10006, 6)],('year', 'month', 'day', 'timestamp', 'transaction_id'))
In [2]: transactions.show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017| 12| 1| 10000| 1|
|2017| 12| 2| 10001| 2|
|2017| 12| 3| 10003| 3|
|2017| 12| 4| 10004| 4|
|2017| 12| 5| 10005| 5|
|2017| 12| 6| 10006| 6|
+----+-----+---+---------+--------------+
我想定义一个函数filter_date_range
,它返回一个DataFrame,该DataFrame由某个日期范围内的事务行组成。
>>> filter_date_range(
df = transactions,
start_date = datetime.date(2017, 12, 2),
end_date = datetime.date(2017, 12, 4)).show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017| 12| 1| 10001| 2|
|2017| 12| 1| 10003| 3|
|2017| 12| 1| 10004| 4|
+----+-----+---+---------+--------------+
假设数据保存在 Hive 分区中,由year
、month
、day
分区,那么执行涉及日期算术的此类过滤器的最有效方法是什么?我正在寻找一种以纯 DataFrame-ic 方式执行此操作的方法,而不使用 transactions.rdd
,以便 Spark 可以推断出实际上只需要读取分区的子集。
【问题讨论】:
一些示例数据会很有用。让我们可以轻松地剪切和粘贴来重新创建您的数据框。 How to make good reproducible Apache Spark Dataframe examples. @pault 使用示例数据更新 【参考方案1】:如果数据是这样分区的:
.
├── _SUCCESS
└── year=2017
└── month=12
├── day=1
│ └── part-0...parquet
├── day=2
│ └── part-0...parquet
├── day=3
│ └── part-0...parquet
├── day=4
│ └── part-0...parquet
├── day=5
│ └── part-0...parquet
└── day=6
└── part-0...parquet
您可以只生成要加载的目录列表:
start_date = datetime.date(2017, 12, 2)
end_date = datetime.date(2017, 12, 4)
n = (end_date - start_date).days + 1
base_path = ...
paths = [
"/year=/month=/day=".format(base_path, d.year, d.month, d.day)
for d in [start_date + datetime.timedelta(days=i) for i in range(n)]
]
spark.read.option("basePath", base_path).load(paths).explain()
# == Parsed Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
#
# == Analyzed Logical Plan ==
# timestamp: bigint, transaction_id: bigint, year: int, month: int, day: int
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
#
# == Optimized Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
#
# == Physical Plan ==
# *FileScan parquet [timestamp#47L,transaction_id#48L,year#49,month#50,day#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/transactions/year=2017/month=12/day=2, file:/user/hiv..., PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:bigint,transaction_id:bigint>
参考:
Iterating through a range of dates in Python【讨论】:
以上是关于在按天分区的数据上过滤 n 天窗口的 spark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章