将自定义函数应用于 spark 数据框组
Posted
技术标签:
【中文标题】将自定义函数应用于 spark 数据框组【英文标题】:Apply a custom function to a spark dataframe group 【发布时间】:2017-01-28 17:40:24 【问题描述】:我有一个非常大的包含这些列的时间序列数据表:
时间戳 车牌 UberRide# 速度LicensePlate/UberRide 数据的每个集合都应考虑整个数据集进行处理。换句话说,我不需要逐行处理数据,而是按 (LicensePlate/UberRide) 分组的所有行一起处理。
我计划将 spark 与 dataframe api 一起使用,但我对如何对 spark 分组数据帧执行自定义计算感到困惑。
我需要做的是:
-
获取所有数据
按某些列分组
Foreach 火花数据帧组应用 f(x)。为每个组返回一个自定义对象
通过应用 g(x) 并返回单个自定义对象来获取结果
如何执行第 3 步和第 4 步?关于我应该使用哪个 spark API(dataframe、dataset、rdd、也许是 pandas...)的任何提示?
整个工作流程如下:
【问题讨论】:
Pandas 不是 Spark 的一部分,您可以使用DataFrame
但您将使用 have to do it in Scala 和 add Python wrapper,RDD 应该可以正常工作。
不能直接使用Spark吗?我正在使用 Spark 1.6.2
如果你指的是 PySpark,那么就像我说的那样 - RDD 应该可以正常工作。
在向新手提问之前,我会进一步调查究竟如何使用 rdds... :-)
@NischalHp : df.rdd.keyBy(lambda x: (x['key1'], x['key2'])) \ .groupByKey() \ .map(lambda groupped_data: my_map_fn(分组数据))
【参考方案1】:
虽然 Spark 提供了一些与 Pandas 集成的方法,但它并没有使 Pandas 成为分布式的。因此,无论您在 Spark 中使用 Pandas 做什么都是简单的本地操作(在转换中使用时对驱动程序或执行程序)。
如果您正在寻找具有类似 Pandas API 的分布式系统,您应该查看dask
。
Aggregators
处理分组 Datasets
但 API 的这一部分只能在 Scala 中直接访问。创建一个时write a Python wrapper 并不难。
RDD API 提供了许多函数,可用于分组执行操作,从低级 repartition
/ repartitionAndSortWithinPartitions
开始,以许多 *byKey
方法结束(combineByKey
、groupByKey
、@ 987654334@等)。
哪一个适用于您的情况取决于您要应用的函数的属性(它是否具有关联性和可交换性,是否可以在流上工作,是否需要特定的顺序)。
最通用但效率低的方法可以总结如下:
h(rdd.keyBy(f).groupByKey().mapValues(g).collect())
其中f
从值映射到key
,g
对应于每组聚合,h
是最终合并。大多数时候你可以做得比这更好,所以它应该只作为最后的手段。
相对复杂的逻辑可以用DataFrames
/Spark SQL和window functions来表达。
另见Applying UDFs on GroupedData in PySpark (with functioning python example)
【讨论】:
【参考方案2】:您正在寻找的东西自 Spark 2.3 以来就存在:Pandas 矢量化 UDF。它允许对 DataFrame 进行分组并使用 pandas 应用自定义转换,分布在每个组中:
df.groupBy("groupColumn").apply(myCustomPandasTransformation)
它很容易使用,所以我会输入a link to Databricks' presentation of pandas UDF。
但是,我还不知道在 Scala 中进行分组转换的实用方法,因此欢迎提供任何其他建议。
编辑:在 Scala 中,您可以使用 Dataset 的 groupByKey
+ mapGroups
/flatMapGroups
实现与 Spark 早期版本相同的功能。
【讨论】:
以上是关于将自定义函数应用于 spark 数据框组的主要内容,如果未能解决你的问题,请参考以下文章
Spark scala Dataframe:如何将自定义类型应用于现有数据框?