如何在 PySpark 中跨多个时间间隔使用 .filter() 操作?
Posted
技术标签:
【中文标题】如何在 PySpark 中跨多个时间间隔使用 .filter() 操作?【英文标题】:How to use .filter() operation over multiple time intervals in PySpark? 【发布时间】:2021-07-19 21:30:13 【问题描述】:我想优化以下功能。我正在使用广播内连接,我认为它不够快。
我有一个带有属性的区间 DataFrame:timestamp_start、timestamp_end 以及一个时间序列数据框元组,其属性为:timestamp, value。
然后函数返回属于某个区间的所有值:
def filter_intervals(intervals, df):
df = df.join(broadcast(intervals),
[df.timestamp >= intervals.timestamp_start,
df.timestamp <= intervals.timestamp_end],
how='inner')
return df
我应该如何重写一个更高效的函数?
【问题讨论】:
intervals
有多大(大约在记录中)?它最初是一个 DataFrame 还是您从中创建一个 DataFrame 以在联接中使用它?
假设间隔大小为 500-2500 行。最初它是一个 DataFrame。但如果有帮助,我不妨收集数据。
【参考方案1】:
如果intervals
不是很大,我会尝试创建一个自定义函数is_in_interval(t)
。您可以按timestamp_start
对区间进行初步排序,然后使用二分搜索查找区间(如果有)。此外,我不会加入数据集,而是创建一个 UDF。像这样的:
import pyspark.sql.functions as F
from pyspark.sql.types import BooleanType
def filter_intervals(sorted_intervals, df):
def is_in_interval(t):
...
is_in_interval_udf = F.udf(is_valid_id, BooleanType())
return df.filter(is_in_interval_udf("timestamp"))
在这种情况下,您不必广播intervals
,因为每个执行程序上都会有intervals
的本地副本。但同样,这只有在 intervals
不太大并且适合执行者的内存时才会有效。
这是一篇关于 Python 中的二分搜索的文章: Binary search (bisection) in Python
【讨论】:
以上是关于如何在 PySpark 中跨多个时间间隔使用 .filter() 操作?的主要内容,如果未能解决你的问题,请参考以下文章
如何让 viewForZoomingInScrollView 在 Xcode 中跨多个 UIViewControllers 工作?
如何在 PHPUnit 中跨多个测试模拟测试 Web 服务?