spark ml特征转换操作StringIndexerIndexToStringVectorIndexeroneHotEncoderBucketizerQuantileDiscretizer

Posted 辉常努腻

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark ml特征转换操作StringIndexerIndexToStringVectorIndexeroneHotEncoderBucketizerQuantileDiscretizer相关的知识,希望对你有一定的参考价值。

文章目录

特征转换方法

StringIndexer

StringIndexer(字符串-索引变换)是一个估计器,是将字符串列编码为标签索引列。索引位于[0,numLabels),按标签频率排序,频率最高的排0,依次类推,因此最常见的标签获取索引是0。

VectorIndexer

VectorIndexer(向量-索引变换)是一种估计器,能够提高决策树或随机森林等ML方法的分类效果,是对数据集特征向量中的类别(离散值)特征进行编号。它能够自动判断哪些特征是离散值型的特征,并对他们进行编号。

IndexToString

IndexToString(索引-字符串变换)是一种转换器,与StringIndexer对应,能将指标标签映射回原始字符串标签。一个常见的用例(下文的例子就是如此)是使用StringIndexer从标签生成索引,使用这些索引训练模型,并从IndexToString的预测索引列中检索原始标签。

一、StringIndexer

  1. StringIndexer本质上是对String类型–>index( number);

  2. 如果是:数值(numeric)–>index(number),实际上是对把数值先进行了类型转换( cast numeric to string and then index the string values.),也就是说无论是String,还是数值,都可以重新编号(Index);

  3. 利用获得的模型转化新数据集时,可能遇到异常情况

    在使用Spark MLlib协同过滤ALS API的时候发现Rating的三个参数:用户id,商品名称,商品打分,前两个都需要是Int值。那么问题来了,当你的用户id,商品名称是String类型的情况下,我们必须寻找一个方法可以将海量String映射为数字类型。好在Spark MLlib可以answer这一切。

    StringIndexer 将一列字符串标签编码成一列下标标签,下标范围在[0, 标签数量),顺序是标签的出现频率。所以最经常出现的标签获得的下标就是0。如果输入列是数字的,我们会将其转换成字符串,然后将字符串改为下标。当下游管道组成部分,比如说Estimator 或Transformer 使用将字符串转换成下标的标签时,你必须将组成部分的输入列设置为这个将字符串转换成下标后的列名。很多情况下,你可以使用setInputCol设置输入列。

import org.apache.spark.ml.feature.StringIndexer;
val spark = SparkSession.builder().appName("db").master("local[*]").getOrCreate()
 
val df = spark.createDataFrame(
      Seq((0,"a"),(1,"b"),(2,"c"),(3,"a"),(4,"a"),(5,"c"))
    ).toDF("id","category")
 
val indexer =new StringIndexer()
       .setInputCol("category")
       .setOutputCol("categoryIndex")
 
val indexed = indexer.fit(df) // 训练一个StringIndexer => StringIndexerModel
                 .transform(df)  // 用 StringIndexerModel transfer 数据集
 
indexed.show()



此外,当你针对一个数据集训练了一个StringIndexer,然后使用其去transform另一个数据集的时候,针对不可见的标签StringIndexer 有两个应对策略:

针对训练集中没有出现的字符串值,spark提供了几种处理的方法:

error,直接抛出异常
skip,跳过该样本数据( skip the row containing the unseen label entirely跳过包含不可见标签的这一行)
keep,使用一个新的最大索引,来表示所有未出现的值
throw an exception (which is the default)默认是抛出异常

val df2 = spark.createDataFrame(
      Seq((0,"a"),(1,"b"),(2,"c"),(3,"d"),(4,"e"),(5,"f"))
    ).toDF("id","category")
 
 
val indexed2 = indexer.fit(df)
.setHandleInvalid("skip") // 不匹配就跳过
.transform(df2) // 用 不匹配的stringIndexModel 来 transfer 数据集
 
 
indexed2.show()

二、IndexToString

IndexToString 和StringIndexer是对称的,它将一列下标标签映射回一列包含原始字符串的标签。常用的场合是使用StringIndexer生产下标,通过这些下标训练模型,通过IndexToString从预测出的下标列重新获得原始标签。不过,你也可以使用你自己的标签。

import org.apache.spark.ml.feature.IndexToString
 
val converter =new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
 
 
val converted = converter.transform(indexed) // class IndexToString extends Transformer
 
converted.select("id","originalCategory").show()

package xingoo.ml.features.tranformer
 
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.IndexToString, StringIndexer
import org.apache.spark.sql.SparkSession
 
object IndexToString2 
  def main(args: Array[String]): Unit = 
    val spark = SparkSession.builder().master("local[*]").appName("dct").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
 
    val df = spark.createDataFrame(Seq(
      (0, "a"),
      (1, "b"),
      (2, "c"),
      (3, "a"),
      (4, "a"),
      (5, "c")
    )).toDF("id", "category")
 
    val indexer = new StringIndexer()
      .setInputCol("category")
      .setOutputCol("categoryIndex")
      .fit(df)
    val indexed = indexer.transform(df)
 
    println(s"Transformed string column '$indexer.getInputCol' " +
      s"to indexed column '$indexer.getOutputCol'")
    indexed.show()
 
    val inputColSchema = indexed.schema(indexer.getOutputCol)
    println(s"StringIndexer will store labels in output column metadata: " +
      s"$Attribute.fromStructField(inputColSchema).toString\\n")
 
    val converter = new IndexToString()
      .setInputCol("categoryIndex")
      .setOutputCol("originalCategory")
 
    val converted = converter.transform(indexed)
 
    println(s"Transformed indexed column '$converter.getInputCol' back to original string " +
      s"column '$converter.getOutputCol' using labels in metadata")
    converted.select("id", "categoryIndex", "originalCategory").show()
  

输出

Transformed string column 'category' to indexed column 'categoryIndex'
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+
 
StringIndexer will store labels in output column metadata: "vals":["a","c","b"],"type":"nominal","name":"categoryIndex"
 
Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata
+---+-------------+----------------+
| id|categoryIndex|originalCategory|
+---+-------------+----------------+
|  0|          0.0|               a|
|  1|          2.0|               b|
|  2|          1.0|               c|
|  3|          0.0|               a|
|  4|          0.0|               a|
|  5|          1.0|               c|
+---+-------------+----------------+

三、VectorIndexer

主要作用:提高决策树或随机森林等ML方法的分类效果。
VectorIndexer是对数据集特征向量中的类别(离散值)特征(index categorical features categorical features )进行编号。
它能够自动判断那些特征是离散值型的特征,并对他们进行编号,具体做法是通过设置一个maxCategories,特征向量中某一个特征不重复取值个数小于maxCategories,则被重新编号为0~K(K<=maxCategories-1)。某一个特征不重复取值个数大于maxCategories,则该特征视为连续值,不会重新编号(不会发生任何改变)。

//定义输入输出列和最大类别数为5,某一个特征  
//(即某一列)中多于5个取值视为连续值  
VectorIndexerModel featureIndexerModel=new VectorIndexer()  
                 .setInputCol("features")  
                 .setMaxCategories(5)  
                 .setOutputCol("indexedFeatures")  
                 .fit(rawData);  
//加入到Pipeline  
Pipeline pipeline=new Pipeline()  
                 .setStages(new PipelineStage[]  
                         labelIndexerModel,  
                         featureIndexerModel,  
                         dtClassifier,  
                         converter);  
pipeline.fit(rawData).transform(rawData).select("features","indexedFeatures").show(20,false);  
//显示如下的结果:          
+-------------------------+-------------------------+  
|features                 |indexedFeatures          |  
+-------------------------+-------------------------+  
|(3,[0,1,2],[2.0,5.0,7.0])|(3,[0,1,2],[2.0,1.0,1.0])|  
|(3,[0,1,2],[3.0,5.0,9.0])|(3,[0,1,2],[3.0,1.0,2.0])|  
|(3,[0,1,2],[4.0,7.0,9.0])|(3,[0,1,2],[4.0,3.0,2.0])|  
|(3,[0,1,2],[2.0,4.0,9.0])|(3,[0,1,2],[2.0,0.0,2.0])|  
|(3,[0,1,2],[9.0,5.0,7.0])|(3,[0,1,2],[9.0,1.0,1.0])|  
|(3,[0,1,2],[2.0,5.0,9.0])|(3,[0,1,2],[2.0,1.0,2.0])|  
|(3,[0,1,2],[3.0,4.0,9.0])|(3,[0,1,2],[3.0,0.0,2.0])|  
|(3,[0,1,2],[8.0,4.0,9.0])|(3,[0,1,2],[8.0,0.0,2.0])|  
|(3,[0,1,2],[3.0,6.0,2.0])|(3,[0,1,2],[3.0,2.0,0.0])|  
|(3,[0,1,2],[5.0,9.0,2.0])|(3,[0,1,2],[5.0,4.0,0.0])|  
+-------------------------+-------------------------+  
结果分析:特征向量包含3个特征,即特征0,特征1,特征2。如Row=1,对应的特征分别是2.0,5.0,7.0.被转换为2.0,1.0,1.0。  
我们发现只有特征1,特征2被转换了,特征0没有被转换。这是因为特征06中取值(234589),多于前面的设置setMaxCategories(5)  
,因此被视为连续值了,不会被转换。  
特征1中,(45679-->(0,1,2,3,4,5)  
特征2,  (2,7,9)-->(0,1,2)  
  
输出DataFrame格式说明(Row=1):  
3个特征 特征012      转换前的值    
|(3,    [0,1,2],      [2.0,5.0,7.0])  
3个特征 特征112       转换后的值  
|(3,    [0,1,2],      [2.0,1.0,1.0])|

离散<->连续特征或Label相互转换

oneHotEncoder

独热编码将类别特征(离散的,已经转换为数字编号形式),映射成独热编码。这样在诸如Logistic回归这样需要连续数值值作为特征输入的分类器中也可以使用类别(离散)特征。

独热编码即 One-Hot 编码,又称一位有效编码,其方法是使用N位 状态寄存器来对N个状态进行编码,每个状态都由他独立的寄存器 位,并且在任意时候,其 中只有一位有效。
例如: 自然状态码为:000,001,010,011,100,101
独热编码为:000001,000010,000100,001000,010000,100000
可以这样理解,对于每一个特征,如果它有m个可能值,那么经过独 热编码后,就变成了m个二元特征。并且,这些特征互斥,每次只有 一个激活。因此,数据会变成稀疏的。
这样做的好处主要有:
解决了分类器不好处理属性数据的问题,在一定程度上也起到了扩充特征的作用

       //onehotencoder前需要转换为string->numerical
        Dataset indexedDf=new StringIndexer()
                        .setInputCol("category")
                        .setOutputCol("indexCategory")
                        .fit(df)
                        .transform(df);
        //对随机分布的类别进行OneHotEncoder,转换后可以当成连续数值输入
        Dataset coderDf=new OneHotEncoder()
                        .setInputCol("indexCategory")
                        .setOutputCol("ont

以上是关于spark ml特征转换操作StringIndexerIndexToStringVectorIndexeroneHotEncoderBucketizerQuantileDiscretizer的主要内容,如果未能解决你的问题,请参考以下文章

将火花管道转换为数据框

如何防止 Azure ML Studio 在导入数据集时将特征列转换为 DateTime

Spark2 ML 学习札记

ML.NET Cookbook:(17)如何在分类数据上训练模型?

ML coursera 提交(第 2 周)特征归一化

如何计算最优的 max_depth 来训练具有大量特征的 ML 模型?