如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?

Posted

技术标签:

【中文标题】如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?【英文标题】:How do I get the maximum of an ArrayType of MapTypes using Spark SQL? 【发布时间】:2017-02-07 17:54:14 【问题描述】:

我有以下 Spark DataFrame:

df = sql.createDataFrame([
        (1, [
                'name': 'john', 'score': '0.8',
                'name': 'johnson', 'score': '0.9',
            ]),
        (2, [
                'name': 'jane', 'score': '0.9',
                'name': 'janine', 'score': '0.4',
            ]),
        (3, [
                'name': 'sarah', 'score': '0.2',
                'name': 'sara', 'score': '0.9',
            ]),
    ], schema=['id', 'names'])

Spark 正确推断架构:

root
 |-- id: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

对于每一行,我想选择得分最高的名称。我可以使用 Python UDF 执行此操作,如下所示:

import pyspark.sql.types as T
import pyspark.sql.functions as F

def top_name(names):
    return sorted(names, key=lambda d: d['score'], reverse=True)[0]['name']

top_name_udf = F.udf(top_name, T.StringType())

df.withColumn('top_name', top_name_udf('names')) \
    .select('id', 'top_name') \
    .show(truncate=False)

如您所愿,您会得到:

+---+--------+
|id |top_name|
+---+--------+
|1  |johnson |
|2  |jane    |
|3  |sara    |
+---+--------+

如何使用 Spark SQL 做到这一点?是否可以在没有 Python UDF 的情况下做到这一点,这样数据就不会在Python 和Java 之间序列化?1


1很遗憾,我运行的是 Spark 1.5,无法在 Spark 2.1 中使用 registerJavaFunction

【问题讨论】:

【参考方案1】:

使用sqlContext.registerFunction 方法将你的函数(不是udf)注册到sql。还将您的 df 注册为 sql 表。

sqlContext.registerDataFrameAsTable(df, "names_df")

sqlContext.registerFunction("top_name", top_name,T.StringType())

sqlContext.sql("SELECT top_name(names) as top_name from names_df").collect()

> [Row(top_name=u'johnson'), Row(top_name=u'jane'), Row(top_name=u'sara')]

【讨论】:

谢谢,这对我有用。但是,是否有可能在没有 Python UDF 的情况下使用纯SQL 来代替?我想避免 Python 和 Java 之间的序列化。不幸的是,我使用的是 Spark 1.5,无法访问 registerJavaFunction 来注册 Scala/Java UDF。 可以用普通的sql吗? sqlcontext.sql( "SELECT FIRST(name) as top_names FROM df GROUP BY score ORDER BY score DESC;" ) 不幸的是,似乎不起作用。只有表格列可用于分组依据。我认为这是不可能的,至少在这个版本的 Spark 中是这样。

以上是关于如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?

如何从 Pyspark 中的 MapType 列获取键和值

如何获得 Spark RDD 的 SQL row_number 等效项?

如何在 Spark UDAF 中实现 fastutils 映射?

从 Spark sql Windows 函数获得意外结果

如何将 MapType(StringType, StringType) 的列转换为 StringType?