Spark DataFrames中的argmax:如何检索具有最大值的行
Posted
技术标签:
【中文标题】Spark DataFrames中的argmax:如何检索具有最大值的行【英文标题】:argmax in Spark DataFrames: how to retrieve the row with the maximum value 【发布时间】:2016-08-07 07:18:57 【问题描述】:给定一个 Spark DataFrame df
,我想在某个数字列 'values'
中找到最大值,并获取达到该值的行。我当然可以这样做:
# it doesn't matter if I use scala or python,
# since I hope I get this done with DataFrame API
import pyspark.sql.functions as F
max_value = df.select(F.max('values')).collect()[0][0]
df.filter(df.values == max_value).show()
但这效率很低,因为它需要两次通过df
。
pandas.Series
/DataFrame
和 numpy.array
有 argmax
/idxmax
方法可以有效地执行此操作(一次通过)。标准python也是如此(内置函数max
接受一个key参数,所以可以用来查找最大值的索引)。
Spark 中的正确方法是什么?请注意,我不介意我是获取所有达到最大值的行,还是这些行的任意(非空!)子集。
【问题讨论】:
一般来说,没有更好的解决方案是跨语言并且可以处理任意数据。 @zero323 为什么不可能通过将 RDD 代码转换为 Scala 并添加适当的元数据以供 Catalyst 使用,从而将 RDD 代码包装在 DataFrame API 中的以下答案中? 这是可能的,但它显然打破了这样的假设:使用 Scala 或 Python 都没有关系您也可以仅使用 SQL 来使用 Orderable 数据类型,但这是一种特殊情况而不是一般解决方案。 【参考方案1】:如果架构is Orderable
(架构仅包含原子/原子数组/可递归排序的结构),您可以使用简单的聚合:
Python:
df.select(F.max(
F.struct("values", *(x for x in df.columns if x != "values"))
)).first()
斯卡拉:
df.select(max(struct(
$"values" +: df.columns.collect case x if x!= "values" => col(x): _*
))).first
否则,您可以减少超过 Dataset
(仅限 Scala),但它需要额外的反序列化:
type T = ???
df.reduce((a, b) => if (a.getAs[T]("values") > b.getAs[T]("values")) a else b)
你也可以oredrBy
和limit(1)
/take(1)
:
斯卡拉:
df.orderBy(desc("values")).limit(1)
// or
df.orderBy(desc("values")).take(1)
Python:
df.orderBy(F.desc('values')).limit(1)
# or
df.orderBy(F.desc("values")).take(1)
【讨论】:
您介意链接到Orderable
架构的解释/定义吗?谷歌搜索只找到了这个答案:)
github.com/apache/spark/blob/…
F 的别名来自:import pyspark.sql.functions as F
,对于阅读问题标题、然后是答案并错过问题上下文的任何其他人!【参考方案2】:
也许这是一个不完整的答案,但您可以使用DataFrame
的内部RDD
,应用max
方法并使用确定的密钥获取最大记录。
a = sc.parallelize([
("a", 1, 100),
("b", 2, 120),
("c", 10, 1000),
("d", 14, 1000)
]).toDF(["name", "id", "salary"])
a.rdd.max(key=lambda x: x["salary"]) # Row(name=u'c', id=10, salary=1000)
【讨论】:
我可以假设 RDD API 的 1 次传递(Scala 以避免 python 开销)比 DataFrame API 的 2 次传递更快吗?还是 Catalyst 可以在这里做一些优化?以上是关于Spark DataFrames中的argmax:如何检索具有最大值的行的主要内容,如果未能解决你的问题,请参考以下文章
Spark - 如何将 JSON 转义的字符串字段解析为 DataFrames 中的 JSON 对象?
将无效数据设置为 Spark DataFrames 中的缺失数据
Spark SQL and DataFrame Guide(1.4.1)——之DataFrames
是否可以使用 pyspark 过滤 Spark DataFrames 以返回列值在列表中的所有行?
来自 Spark / Dataframes 的 AWS SSE-KMS 加密
我们如何在 Spark 中使用 Dataframes(由 structtype 方法创建)合并具有不同列数的 2 个表?