将 spark.sql 查询转换为 spark/scala 查询

Posted

技术标签:

【中文标题】将 spark.sql 查询转换为 spark/scala 查询【英文标题】:Convert spark.sql query to spark/scala query 【发布时间】:2019-04-20 14:49:52 【问题描述】:

我正在使用一些在 scala 中返回 true/false 的业务逻辑在 spark 数据框中添加一列。该实现是使用 UDF 完成的,并且 UDF 有超过 10 个参数,因此我们需要先注册 UDF,然后才能使用它。以下已完成

spark.udf.register("new_col", new_col)

// writing the UDF
val new_col(String, String, ..., Timestamp) => Boolean = (col1: String, col2: String, ..., col12: Timestamp) => 
     if ( ... ) true
     else false

现在,当我尝试编写以下 spark/Scala 作业时,它无法正常工作

val result = df.withColumn("new_col", new_col(col1, col2, ..., col12))

我收到以下错误

<console>:56: error: overloaded method value udf with alternatives:
  (f: AnyRef,dataType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF10[_, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF9[_, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF8[_, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF7[_, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF6[_, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF5[_, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF4[_, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF3[_, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF2[_, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF1[_, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF0[_],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and> ...

另一方面,如果我创建一个临时视图并使用 spark.sql,它可以像下面这样完美地工作

df.createOrReplaceTempView("data")
val result = spark.sql(
    s"""
    SELECT *, new_col(col1, col2, ..., col12) AS new_col FROM data
    """
    )

我错过了什么吗?在 spark/scala 中进行此类查询的方法是什么?

【问题讨论】:

docs.databricks.com/spark/latest/spark-sql/udf-scala.html 【参考方案1】:

DataFramesSparkSQL 中有不同的注册UDF 方法

要在Spark Sql中使用,需要将udf注册为

spark.sqlContext.udf.register("function_name", function)

DataFrames中使用

val my_udf = org.apache.spark.sql.functions.udf(function)

当您使用 spark.sqlContext.udf.register 时,它在 Spark SQL 中可用。

编辑: 以下代码应该可以工作,我只使用了 2 col 位,它应该可以工作到 22 cols

val new_col :(String, String) => Boolean = (col1: String, col2: String) => 
  true


val new_col_udf = udf(new_col)
spark.sqlContext.udf.register("new_col", new_col)

var df = Seq((1,2,3,4,5,6,7,8,9,10,11)).toDF()
df.createOrReplaceTempView("data")
val result = spark.sql(
  s"""SELECT *, new_col(_1, _2) AS new_col FROM data"""
)
result.show()
df = df.withColumn("test", new_col_udf($"_1",$"_2") )
df.show()

【讨论】:

更新了编辑部分的代码,它有 udf 并在 df 和 sql 中工作,请注意 udf 有限制,最多可以接受 22 个参数

以上是关于将 spark.sql 查询转换为 spark/scala 查询的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark SQL 中将 long 类型的列转换为 calendarinterval 类型

Spark sql查询到熊猫问题

使用 Spark 作业服务器的 Spark SQL 作业中的错误“此上下文的作业类型无效”

将 Spark SQL 批处理源转换为结构化流接收器

如何将 scala spark.sql.dataFrame 转换为 Pandas 数据框

在 Spark SQL 中使用 collect_list 和 collect_set