如何在 Spark ML 中为分类创建正确的数据框

Posted

技术标签:

【中文标题】如何在 Spark ML 中为分类创建正确的数据框【英文标题】:How to create correct data frame for classification in Spark ML 【发布时间】:2015-09-10 18:55:18 【问题描述】:

我正在尝试使用 Spark ML api 运行随机森林分类,但在创建正确的数据框输入到管道时遇到问题。

这里是示例数据:

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"

agehours_per_week 是整数,而包括标签 salaryRange 在内的其他特征是分类(字符串)

Spark csv library 可以像这样加载这个 csv 文件(我们称之为 sample.csv):

val data = sqlContext.csvFile("/home/dusan/sample.csv")

默认情况下,所有列都作为字符串导入,因此我们需要将“age”和“hours_per_week”更改为 Int:

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))

只是为了检查架构现在的样子:

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)

然后让我们设置交叉验证器和管道:

val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf)) 
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

运行此行时出现错误:

val cmModel = cv.fit(dataFixed)

java.lang.IllegalArgumentException:字段“特征”不存在。

可以在 RandomForestClassifier 中设置标签列和特征列,但是我有 4 列作为预测变量(特征),而不仅仅是一个。

我应该如何组织我的数据框,以便正确组织标签和特征列?

为了您的方便,这里是完整的代码:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.Vector, Vectors


object SampleClassification 

  def main(args: Array[String]): Unit = 

    //set spark context
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import com.databricks.spark.csv._

    //load data by using databricks "Spark CSV Library" 
    val data = sqlContext.csvFile("/home/dusan/sample.csv")

    //by default all columns are imported as string so we need to change "age" and  "hours_per_week" to Int
    val toInt    = udf[Int, String]( _.toInt)
    val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))


    val rf = new RandomForestClassifier()

    val pipeline = new Pipeline().setStages(Array(rf))

    val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

    // this fails with error
    //java.lang.IllegalArgumentException: Field "features" does not exist.
    val cmModel = cv.fit(dataFixed) 
  


感谢您的帮助!

【问题讨论】:

不知道 scala 语言,但是你在哪里设置数据集中的标签和特征,比如 LabeledPoint(labels, list(features)) ,查看spark.apache.org/docs/latest/mllib-linear-methods.html中的示例 @ABC,请查看我在下面问题中的评论。 检查这个例子github.com/apache/spark/blob/master/examples/src/main/scala/org/… where val model = pipeline.fit(training.toDF()) 在管道中使用数据帧 【参考方案1】:

从 Spark 1.4 开始,您可以使用 Transformer org.apache.spark.ml.feature.VectorAssembler。 只需提供您想成为特征的列名。

val assembler = new VectorAssembler()
  .setInputCols(Array("col1", "col2", "col3"))
  .setOutputCol("features")

并将其添加到您的管道中。

【讨论】:

tuxdna's answer 解释了问题的细节,以及解决方案的外观。 这个答案显示了完成它的好方法。 这是行不通的,因为有些特性是字符串类型的。严格数值数据的绝佳解决方案。 @gstvolvr 您需要先使用StringIndexer 将字符串转换为数字。为了清楚起见,可能值得将此步骤添加到答案中。【参考方案2】:

您只需要确保您的数据框中有一个"features" 列,其类型为VectorUDF,如下所示:

scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]

scala> val cmModel = cv.fit(df2) 
java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
    at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
    at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
    at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
    at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
    at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)

EDIT1

本质上,您的数据框中需要有两个字段“特征”用于特征向量,“标签”用于实例标签。实例的类型必须为 Double

要使用Vector 类型创建“功能”字段,请首先创建udf,如下所示:

val toVec4    = udf[Vector, Int, Int, String, String]  (a,b,c,d) => 
  val e3 = c match 
    case "hs-grad" => 0
    case "bachelors" => 1
    case "masters" => 2
  
  val e4 = d match case "male" => 0 case "female" => 1
  Vectors.dense(a, b, e3, e4) 

现在还要对“标签”字段进行编码,创建另一个udf,如下所示:

val encodeLabel    = udf[Double, String]( _ match  case "A" => 0.0 case "B" => 1.0 )

现在我们使用这两个udf 转换原始数据帧:

val df = dataFixed.withColumn(
  "features",
  toVec4(
    dataFixed("age"),
    dataFixed("hours_per_week"),
    dataFixed("education"),
    dataFixed("sex")
  )
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")

请注意,数据框中可能存在额外的列/字段,但在这种情况下,我只选择了 featureslabel

scala> df.show()
+-------------------+-----+
|           features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]|  0.0|
|[28.0,40.0,1.0,1.0]|  0.0|
|[52.0,45.0,0.0,0.0]|  1.0|
|[31.0,50.0,2.0,1.0]|  1.0|
|[42.0,40.0,1.0,0.0]|  1.0|
+-------------------+-----+

现在由您来为您的学习算法设置正确的参数以使其发挥作用。

【讨论】:

您是否有机会展示我如何从我的数据中创建 VectorUDF 类型的名为“特征”的列? @DusanGrubjesic:我添加了代码示例。请检查EDIT1 这真的很棒!我只是不确定我们如何将信息从 ML 传递给分类器,现在这些 e3 和 e4 是分类特征而不是数字?因为在“低级”mllib api 中,可以通过 categoricalFeaturesInfo 传递分类特征的索引和类别数。在“高级” ml api 中,这应该直接从模式中提取。 在这种情况下,Double 值(全部为数字)的结果 Vector 构成您的特征向量。您可能想要进行标准化、ohe-hot 编码、规范化……无论您看起来适合您的算法,但您的特征向量中的值必须全部为 Double。您指的是哪个低级 API? @DusanGrubjesic:我很高兴它有帮助。并感谢 mlllib 和 ml 之间的区别 :-)【参考方案3】:

根据关于 mllib - 随机树的 spark 文档,在我看来,您应该定义正在使用的特征图,并且点应该是标记点。

这将告诉算法应该使用哪一列作为预测,哪一列是特征。

https://spark.apache.org/docs/latest/mllib-decision-tree.html

【讨论】:

mllib 包中有一个旧的 api,这些点确实应该是 LabeledPoint。但是,我正在尝试使用位于 ml 包中的新 api,因为它支持管道、交叉验证等。这个新的 api 使用 DataFrame 作为输入。例如比较这两个:来自 ml 的 RandomForestClassifier,它使用来自 mllib 的 DataFrame 和 RandomForestModel (spark.apache.org/docs/1.4.0/api/scala/…)

以上是关于如何在 Spark ML 中为分类创建正确的数据框的主要内容,如果未能解决你的问题,请参考以下文章

Spark ML 错误:编号不正确。使用线性 SVC 时检测到的类

如何在 spark ml 中处理决策树、随机森林的分类特征?

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

在scala spark中为两个不同的数据框创建视图

在 Spark 的嵌套 XML 中为来自父数据框的子数据框添加额外的列

如何在 Spark SQL 中为每个组创建 z 分数