如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?
Posted
技术标签:
【中文标题】如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?【英文标题】:How do I get the maximum of an ArrayType of MapTypes using Spark SQL? 【发布时间】:2017-02-07 17:54:14 【问题描述】:我有以下 Spark DataFrame:
df = sql.createDataFrame([
(1, [
'name': 'john', 'score': '0.8',
'name': 'johnson', 'score': '0.9',
]),
(2, [
'name': 'jane', 'score': '0.9',
'name': 'janine', 'score': '0.4',
]),
(3, [
'name': 'sarah', 'score': '0.2',
'name': 'sara', 'score': '0.9',
]),
], schema=['id', 'names'])
Spark 正确推断架构:
root
|-- id: long (nullable = true)
|-- names: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
对于每一行,我想选择得分最高的名称。我可以使用 Python UDF 执行此操作,如下所示:
import pyspark.sql.types as T
import pyspark.sql.functions as F
def top_name(names):
return sorted(names, key=lambda d: d['score'], reverse=True)[0]['name']
top_name_udf = F.udf(top_name, T.StringType())
df.withColumn('top_name', top_name_udf('names')) \
.select('id', 'top_name') \
.show(truncate=False)
如您所愿,您会得到:
+---+--------+
|id |top_name|
+---+--------+
|1 |johnson |
|2 |jane |
|3 |sara |
+---+--------+
如何使用 Spark SQL 做到这一点?是否可以在没有 Python UDF 的情况下做到这一点,这样数据就不会在Python 和Java 之间序列化?1
1很遗憾,我运行的是 Spark 1.5,无法在 Spark 2.1 中使用 registerJavaFunction
。
【问题讨论】:
【参考方案1】:使用sqlContext.registerFunction
方法将你的函数(不是udf)注册到sql。还将您的 df 注册为 sql 表。
sqlContext.registerDataFrameAsTable(df, "names_df")
sqlContext.registerFunction("top_name", top_name,T.StringType())
sqlContext.sql("SELECT top_name(names) as top_name from names_df").collect()
> [Row(top_name=u'johnson'), Row(top_name=u'jane'), Row(top_name=u'sara')]
【讨论】:
谢谢,这对我有用。但是,是否有可能在没有 Python UDF 的情况下使用纯SQL 来代替?我想避免 Python 和 Java 之间的序列化。不幸的是,我使用的是 Spark 1.5,无法访问registerJavaFunction
来注册 Scala/Java UDF。
可以用普通的sql吗? sqlcontext.sql( "SELECT FIRST(name) as top_names FROM df GROUP BY score ORDER BY score DESC;" )
不幸的是,似乎不起作用。只有表格列可用于分组依据。我认为这是不可能的,至少在这个版本的 Spark 中是这样。以上是关于如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Spark SQL 获得 MapTypes 的 ArrayType 的最大值?
如何获得 Spark RDD 的 SQL row_number 等效项?