我们如何在 Spark-Scala 和 Cataloging UDF 中注册一个函数以及其他函数?

Posted

技术标签:

【中文标题】我们如何在 Spark-Scala 和 Cataloging UDF 中注册一个函数以及其他函数?【英文标题】:How do we register a function in Spark-Scala and Cataloging UDF among the other functions? 【发布时间】:2017-08-11 15:33:08 【问题描述】:

我是使用 Scala 的 Spark 的新手,我按照下面给出的代码 sn-p 进行操作,我的问题是我应该何时注册这样的函数,目的是什么,以及为什么我们在像这样的其他函数中对用户定义的函数进行编目?它在哪里使用?我无法理解我在下面给出的输出即将到来。 如果有人告诉我,我会发自内心地感谢任何人。关于。

scala> spark.udf.register("myUpper", (input:String) => input.toUpperCase)
res0: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> spark.catalog.listFunctions.filter('name like "%upper%").show(false)
+-----+--------+-----------+-----------------------------------------------+-----------+
|name |database|description|className                                      |isTemporary|
+-----+--------+-----------+-----------------------------------------------+-----------+
|upper|null    |null       |org.apache.spark.sql.catalyst.expressions.Upper|true       |
+-----+--------+-----------+-----------------------------------------------+-----------+

【问题讨论】:

【参考方案1】:

在 SQL 中的用法:

scala> spark.sql("SELECT myUpper('foo')").show
+----------------+
|UDF:myUpper(foo)|
+----------------+
|             FOO|
+----------------+

selectExpr:

scala> spark.range(0, 1).selectExpr("myUpper('foo')").show
+----------------+
|UDF:myUpper(foo)|
+----------------+
|             FOO|
+----------------+

org.apache.spark.sql.functions.expr

scala> spark.range(0, 1).select(expr("myUpper('foo')")).show
+----------------+
|UDF:myUpper(foo)|
+----------------+
|             FOO|
+----------------+

和表达式过滤器:

scala> spark.range(0, 1).where("myUpper('foo') = 'FOO'").show
+---+
| id|
+---+
|  0|
+---+

【讨论】:

【参考方案2】:

spark 维护一个 datasetfunctions(内置),无需注册即可使用。

scala> spark.catalog.listFunctions
res0: org.apache.spark.sql.Dataset[org.apache.spark.sql.catalog.Function] = [name: string, database: string ... 3 more fields]

现在在我们注册一个函数之前

scala> spark.catalog.listFunctions.filter('name like "%pper%").show
+-----+--------+-----------+--------------------+-----------+
| name|database|description|           className|isTemporary|
+-----+--------+-----------+--------------------+-----------+
|upper|    null|       null|org.apache.spark....|       true|
+-----+--------+-----------+--------------------+-----------+

我们注册后

scala> spark.udf.register("myUpper", (input:String) => input.toUpperCase)
res2: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> spark.catalog.listFunctions.filter('name like "%pper%").show
+-------+--------+-----------+--------------------+-----------+
|   name|database|description|           className|isTemporary|
+-------+--------+-----------+--------------------+-----------+
|myUpper|    null|       null|                null|       true|
|  upper|    null|       null|org.apache.spark....|       true|
+-------+--------+-----------+--------------------+-----------+

现在我们可以像使用其他函数一样使用myUpper函数了

scala> spark.catalog.listFunctions.show(50, false)
+---------------------+--------+-----------+-----------------------------------------------------------------------+-----------+
|name                 |database|description|className                                                              |isTemporary|
+---------------------+--------+-----------+-----------------------------------------------------------------------+-----------+
|!                    |null    |null       |org.apache.spark.sql.catalyst.expressions.Not                          |true       |
|%                    |null    |null       |org.apache.spark.sql.catalyst.expressions.Remainder                    |true       |
|&                    |null    |null       |org.apache.spark.sql.catalyst.expressions.BitwiseAnd                   |true       |
|*                    |null    |null       |org.apache.spark.sql.catalyst.expressions.Multiply                     |true       |
|+                    |null    |null       |org.apache.spark.sql.catalyst.expressions.Add                          |true       |
|-                    |null    |null       |org.apache.spark.sql.catalyst.expressions.Subtract                     |true       |
.................................................................
...................................................................
|==                   |null    |null       |org.apache.spark.sql.catalyst.expressions.EqualTo                      |true       |
|>                    |null    |null       |org.apache.spark.sql.catalyst.expressions.GreaterThan                  |true       |
|>=                   |null    |null       |org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual           |true       |
|^                    |null    |null       |org.apache.spark.sql.catalyst.expressions.BitwiseXor                   |true       |
|abs                  |null    |null       |org.apache.spark.sql.catalyst.expressions.Abs                          |true       |
|acos                 |null    |null       |org.apache.spark.sql.catalyst.expressions.Acos                         |true       |
|add_months           |null    |null       |org.apache.spark.sql.catalyst.expressions.AddMonths                    |true       |
|and                  |null    |null       |org.apache.spark.sql.catalyst.expressions.And                          |true       |
|approx_count_distinct|null    |null       |org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus|true       |
|array                |null    |null       |org.apache.spark.sql.catalyst.expressions.CreateArray                  |true       |
|array_contains       |null    |null       |org.apache.spark.sql.catalyst.expressions.ArrayContains                |true       |
|ascii                |null    |null       |org.apache.spark.sql.catalyst.expressions.Ascii                        |true       |
|asin                 |null    |null       |org.apache.spark.sql.catalyst.expressions.Asin                         |true       |
.....................................................
.....................................................
.........................................................
|assert_true          |null    |null       |org.apache.spark.sql.catalyst.expressions.AssertTrue                   |true       
|covar_pop            |null    |null       |org.apache.spark.sql.catalyst.expressions.aggregate.CovPopulation      |true       |
|covar_samp           |null    |null       |org.apache.spark.sql.catalyst.expressions.aggregate.CovSample          |true       |
|crc32                |null    |null       |org.apache.spark.sql.catalyst.expressions.Crc32                        |true       |
+---------------------+--------+-----------+-----------------------------------------------------------------------+-----------+

有很多功能。

这些函数可以与withColumnselectaggregations等一起使用。

希望回答对你有帮助

【讨论】:

以上是关于我们如何在 Spark-Scala 和 Cataloging UDF 中注册一个函数以及其他函数?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark-scala 在 spark 数据帧上执行枢轴?

Spark-Scala:另存为 csv 文件(RDD)[重复]

Spark-Scala 无法推断架构(将输入路径验证推迟到数据源中)

遇到问题--spark-scala---Cannot resolve overloaded method ‘udf‘--Defines a Scala closure of 11 arguments

遇到问题--spark-scala---Cannot resolve overloaded method ‘udf‘--Defines a Scala closure of 11 arguments(

遇到问题--spark-scala---Cannot resolve overloaded method ‘udf‘--Defines a Scala closure of 11 arguments(