将自定义函数应用于 spark 数据框组

Posted

技术标签:

【中文标题】将自定义函数应用于 spark 数据框组【英文标题】:Apply a custom function to a spark dataframe group 【发布时间】:2017-01-28 17:40:24 【问题描述】:

我有一个非常大的包含这些列的时间序列数据表:

时间戳 车牌 UberRide# 速度

LicensePlate/UberRide 数据的每个集合都应考虑整个数据集进行处理。换句话说,我不需要逐行处理数据,而是按 (LicensePlate/UberRide) 分组的所有行一起处理。

我计划将 spark 与 dataframe api 一起使用,但我对如何对 spark 分组数据帧执行自定义计算感到困惑。

我需要做的是:

    获取所有数据 按某些列分组 Foreach 火花数据帧组应用 f(x)。为每个组返回一个自定义对象 通过应用 g(x) 并返回单个自定义对象来获取结果

如何执行第 3 步和第 4 步?关于我应该使用哪个 spark API(dataframe、dataset、rdd、也许是 pandas...)的任何提示?

整个工作流程如下:

【问题讨论】:

Pandas 不是 Spark 的一部分,您可以使用 DataFrame 但您将使用 have to do it in Scala 和 add Python wrapper,RDD 应该可以正常工作。 不能直接使用Spark吗?我正在使用 Spark 1.6.2 如果你指的是 PySpark,那么就像我说的那样 - RDD 应该可以正常工作。 在向新手提问之前,我会进一步调查究竟如何使用 rdds... :-) @NischalHp : df.rdd.keyBy(lambda x: (x['key1'], x['key2'])) \ .groupByKey() \ .map(lambda groupped_data: my_map_fn(分组数据)) 【参考方案1】:

虽然 Spark 提供了一些与 Pandas 集成的方法,但它并没有使 Pandas 成为分布式的。因此,无论您在 Spark 中使用 Pandas 做什么都是简单的本地操作(在转换中使用时对驱动程序或执行程序)。

如果您正在寻找具有类似 Pandas API 的分布式系统,您应该查看dask

You can define User Defined Aggregate functions or Aggregators 处理分组 Datasets 但 API 的这一部分只能在 Scala 中直接访问。创建一个时write a Python wrapper 并不难。

RDD API 提供了许多函数,可用于分组执行操作,从低级 repartition / repartitionAndSortWithinPartitions 开始,以许多 *byKey 方法结束(combineByKeygroupByKey、@ 987654334@等)。

哪一个适用于您的情况取决于您要应用的函数的属性(它是否具有关联性和可交换性,是否可以在流上工作,是否需要特定的顺序)。

最通用但效率低的方法可以总结如下:

h(rdd.keyBy(f).groupByKey().mapValues(g).collect())

其中f 从值映射到keyg 对应于每组聚合,h 是最终合并。大多数时候你可以做得比这更好,所以它应该只作为最后的手段。

相对复杂的逻辑可以用DataFrames/Spark SQL和window functions来表达。

另见Applying UDFs on GroupedData in PySpark (with functioning python example)

【讨论】:

【参考方案2】:

您正在寻找的东西自 Spark 2.3 以来就存在:Pandas 矢量化 UDF。它允许对 DataFrame 进行分组并使用 pandas 应用自定义转换,分布在每个组中:

df.groupBy("groupColumn").apply(myCustomPandasTransformation)

它很容易使用,所以我会输入a link to Databricks' presentation of pandas UDF。

但是,我还不知道在 Scala 中进行分组转换的实用方法,因此欢迎提供任何其他建议。

编辑:在 Scala 中,您可以使用 Dataset 的 groupByKey + mapGroups/flatMapGroups 实现与 Spark 早期版本相同的功能。

【讨论】:

以上是关于将自定义函数应用于 spark 数据框组的主要内容,如果未能解决你的问题,请参考以下文章

Spark 将自定义模式应用于 DataFrame

Spark scala Dataframe:如何将自定义类型应用于现有数据框?

如何将自定义函数应用于 xarray.Dataset 坐标的每个值?

将自定义函数应用于 sklearn 中的稀疏矩阵

如何将自定义函数加载到 R 中的 foreach 循环中?

Spark篇---SparkSQL中自定义UDF和UDAF,开窗函数的应用