如何在 PySpark 中跨多个时间间隔使用 .filter() 操作?

Posted

技术标签:

【中文标题】如何在 PySpark 中跨多个时间间隔使用 .filter() 操作?【英文标题】:How to use .filter() operation over multiple time intervals in PySpark? 【发布时间】:2021-07-19 21:30:13 【问题描述】:

我想优化以下功能。我正在使用广播内连接,我认为它不够快。

我有一个带有属性的区间 DataFrame:timestamp_start、timestamp_end 以及一个时间序列数据框元组,其属性为:timestamp, value。

然后函数返回属于某个区间的所有值:


def filter_intervals(intervals, df):

    df = df.join(broadcast(intervals),
        [df.timestamp >= intervals.timestamp_start,
         df.timestamp <= intervals.timestamp_end],
         how='inner')
    return df

我应该如何重写一个更高效的函数?

【问题讨论】:

intervals 有多大(大约在记录中)?它最初是一个 DataFrame 还是您从中创建一个 DataFrame 以在联接中使用它? 假设间隔大小为 500-2500 行。最初它是一个 DataFrame。但如果有帮助,我不妨收集数据。 【参考方案1】:

如果intervals不是很大,我会尝试创建一个自定义函数is_in_interval(t)。您可以按timestamp_start 对区间进行初步排序,然后使用二分搜索查找区间(如果有)。此外,我不会加入数据集,而是创建一个 UDF。像这样的:

import pyspark.sql.functions as F
from pyspark.sql.types import BooleanType

def filter_intervals(sorted_intervals, df):

  def is_in_interval(t):
    ...

  is_in_interval_udf = F.udf(is_valid_id, BooleanType())

  return df.filter(is_in_interval_udf("timestamp"))

在这种情况下,您不必广播intervals,因为每个执行程序上都会有intervals 的本地副本。但同样,这只有在 intervals 不太大并且适合执行者的内存时才会有效。

这是一篇关于 Python 中的二分搜索的文章: Binary search (bisection) in Python

【讨论】:

以上是关于如何在 PySpark 中跨多个时间间隔使用 .filter() 操作?的主要内容,如果未能解决你的问题,请参考以下文章

如何让 viewForZoomingInScrollView 在 Xcode 中跨多个 UIViewControllers 工作?

如何在 PHPUnit 中跨多个测试模拟测试 Web 服务?

在python中跨多个列表查找列的最小值

如何在 C++ 中跨多个源文件共享变量?

如何在 pyspark.sql.functions.when() 中使用多个条件?

如何在 youtube api 中跨多个频道搜索内容?