火花在UDF中创建数据框

Posted

技术标签:

【中文标题】火花在UDF中创建数据框【英文标题】:spark create Dataframe in UDF 【发布时间】:2018-11-30 03:54:30 【问题描述】:

我有一个例子,想在 UDF 中创建 Dataframe。类似于下面的那个

import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.feature.VectorAssembler

数据到数据框

    val df = Seq((1,1,34,23,34,56),(2,1,56,34,56,23),(3,0,34,23,23,78),(4,0,23,34,78,23),(5,1,56,23,23,12),
(6,1,67,34,56,34),(7,0,23,23,23,56),(8,0,12,34,45,89),(9,1,12,34,12,34),(10,0,12,34,23,34)).toDF("id","label","tag1","tag2","tag3","tag4")
    val assemblerDF = new VectorAssembler().setInputCols(Array("tag1", "tag2", "tag3","tag4")).setOutputCol("features")
    val data = assemblerDF.transform(df)
    val Array(train,test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    val testData=test.toDF    

    val loadmodel=LogisticRegressionModel.load("/user/xu/savemodel")
    sc.broadcast(loadmodel)
    val assemblerFe = new VectorAssembler().setInputCols(Array("a", "b", "c","d")).setOutputCol("features")
    sc.broadcast(assemblerFe)

UDF

    def predict(predictSet:Vector):Double=
        val set=Seq((1,2,3,4)).toDF("a","b","c","d")
        val predata = assemblerFe.transform(set)
        val result=loadmodel.transform(predata)
        result.rdd.take(1)(0)(3).toString.toDouble

    spark.udf.register("predict", predict _)
    testData.registerTempTable("datatable")
    spark.sql("SELECT predict(features) FROM datatable").take(1)

我收到类似的错误

ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 7) [Executor task launch worker for task 7]
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => double)

WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => double)

不支持数据框吗?我正在使用 Spark 2.3.0 和 Scala 2.11。谢谢

【问题讨论】:

为什么要将模型应用到UDF中的测试数据上,你可以将模型应用到主程序中的测试数据上。 基于之前的框架,可能无法在UDF中创建Dataframe 【参考方案1】:

如 cmets 中所述,此处不需要 UDF 即可将经过训练的模型应用于测试数据。您可以将模型应用于主程序本身的数据框测试,如下所示:

val df = Seq((1,1,34,23,34,56),(2,1,56,34,56,23),(3,0,34,23,23,78),(4,0,23,34,78,23),(5,1,56,23,23,12),
(6,1,67,34,56,34),(7,0,23,23,23,56),(8,0,12,34,45,89),(9,1,12,34,12,34),(10,0,12,34,23,34)).toDF("id","label","tag1","tag2","tag3","tag4")
val assemblerDF = new VectorAssembler().setInputCols(Array("tag1", "tag2", "tag3","tag4")).setOutputCol("features")
val data = assemblerDF.transform(df)
val Array(train,test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val testData=test.toDF    

val loadmodel=LogisticRegressionModel.load("/user/xu/savemodel")
sc.broadcast(loadmodel)
val assemblerFe = new VectorAssembler().setInputCols(Array("a", "b", "c","d")).setOutputCol("features")
sc.broadcast(assemblerFe)


val set=Seq((1,2,3,4)).toDF("a","b","c","d")
val predata = assemblerFe.transform(set)
val result=loadmodel.transform(predata) // Applying model on predata dataframe. You can apply model on any DataFrame.

result 现在是一个DataFrame,您可以将DataFrame 注册为表格并使用SQL 查询predictLabel 和特征,或者您可以直接从DataFrame 中选择predictLabel 和其他字段。

请注意,UDF 是 Spark SQL 的一项功能,用于定义新的基于列的函数,这些函数扩展了 Spark SQL 用于转换数据集的 DSL 词汇。它不会将 DataFrame 本身作为返回类型返回。除非必要,一般不建议使用 UDF,请参阅:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs-blackbox.html

【讨论】:

以上是关于火花在UDF中创建数据框的主要内容,如果未能解决你的问题,请参考以下文章

从用户定义的函数创建火花数据框列

在火花中创建一个以字长为键、以排序字为值的字典?

基于连接火花创建新的二进制列

如何在火花中将列转换为数组[长]

java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException

UDF Scala 火花语法