尝试从 UDF 执行 spark sql 查询

Posted

技术标签:

【中文标题】尝试从 UDF 执行 spark sql 查询【英文标题】:Trying to execute a spark sql query from a UDF 【发布时间】:2016-08-12 07:31:29 【问题描述】:

我正在尝试使用 scala 在 spark 框架中编写一个内联函数,它将接受一个字符串输入,执行一个 sql 语句并返回一个字符串值

val testfunc: (String=>String)= (arg1:String) => 
val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """)                               
 k.head().getString(0)

我正在将此 scala 函数注册为 UDF

   val testFunc_test = udf(testFunc)

我在蜂巢表上有一个数据框

    val df = sqlContext.table("some_table")

然后我在 withColumn 中调用 udf 并尝试将其保存在新的数据框中。

    val new_df = df.withColumn("test", testFunc_test($"col1"))

但是每次我尝试这样做时都会出错

16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,       10.0.1.5): java.lang.NullPointerException
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
    at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)

我对 spark 和 scala 比较陌生。但我不确定为什么这段代码不应该运行。任何见解或解决方法都将受到高度赞赏。

请注意,我没有粘贴整个错误堆栈。如果需要,请告诉我。

【问题讨论】:

【参考方案1】:

您不能在 UDF 中使用 sqlContext - UDF 必须可序列化才能发送到执行程序,并且上下文(可以被认为是与集群的连接)不能被序列化并发送到节点 - 只有驱动程序应用程序(UDF 已定义,但未执行)可以使用sqlContext

看起来您的用例(对表 Y 中的每条记录执行从表 X 中选择)最好通过使用 join 来完成。

【讨论】:

以上是关于尝试从 UDF 执行 spark sql 查询的主要内容,如果未能解决你的问题,请参考以下文章

执行 UDF 失败

spark 能执行udf 不能执行udaf,啥原因

spark 能执行udf 不能执行udaf,啥原因

Spark:DataFrame 上 UDF 的任务不可序列化

对 SparkSQL 中数组列的每个元素执行 UDF(需要另一个 spark 作业)

将 spark.sql 查询转换为 spark/scala 查询