Pyspark 中的过采样或 SMOTE

Posted

技术标签:

【中文标题】Pyspark 中的过采样或 SMOTE【英文标题】:Oversampling or SMOTE in Pyspark 【发布时间】:2019-05-24 23:59:03 【问题描述】:

我有 7 个类,记录总数为 115,我想对这些数据运行随机森林模型。但由于数据不足以获得高精度。因此,我想对所有类应用过采样,以使多数类本身获得更高的计数,然后相应地成为少数。这在 PySpark 中可行吗?

+---------+-----+
| SubTribe|count|
+---------+-----+
|    Chill|   10|
|     Cool|   18|
|Adventure|   18|
|    Quirk|   13|
|  Mystery|   25|
|    Party|   18|
|Glamorous|   13|
+---------+-----+

【问题讨论】:

【参考方案1】:

这是我过去使用的 Pyspark 和 Scala smote 的另一种实现。我已经复制了代码和referenced the source,因为它很小:

Pyspark:

import random
import numpy as np
from pyspark.sql import Row
from sklearn import neighbors
from pyspark.ml.feature import VectorAssembler

def vectorizerFunction(dataInput, TargetFieldName):
    if(dataInput.select(TargetFieldName).distinct().count() != 2):
        raise ValueError("Target field must have only 2 distinct classes")
    columnNames = list(dataInput.columns)
    columnNames.remove(TargetFieldName)
    dataInput = dataInput.select((','.join(columnNames)+','+TargetFieldName).split(','))
    assembler=VectorAssembler(inputCols = columnNames, outputCol = 'features')
    pos_vectorized = assembler.transform(dataInput)
    vectorized = pos_vectorized.select('features',TargetFieldName).withColumn('label',pos_vectorized[TargetFieldName]).drop(TargetFieldName)
    return vectorized

def SmoteSampling(vectorized, k = 5, minorityClass = 1, majorityClass = 0, percentageOver = 200, percentageUnder = 100):
    if(percentageUnder > 100|percentageUnder < 10):
        raise ValueError("Percentage Under must be in range 10 - 100");
    if(percentageOver < 100):
        raise ValueError("Percentage Over must be in at least 100");
    dataInput_min = vectorized[vectorized['label'] == minorityClass]
    dataInput_maj = vectorized[vectorized['label'] == majorityClass]
    feature = dataInput_min.select('features')
    feature = feature.rdd
    feature = feature.map(lambda x: x[0])
    feature = feature.collect()
    feature = np.asarray(feature)
    nbrs = neighbors.NearestNeighbors(n_neighbors=k, algorithm='auto').fit(feature)
    neighbours =  nbrs.kneighbors(feature)
    gap = neighbours[0]
    neighbours = neighbours[1]
    min_rdd = dataInput_min.drop('label').rdd
    pos_rddArray = min_rdd.map(lambda x : list(x))
    pos_ListArray = pos_rddArray.collect()
    min_Array = list(pos_ListArray)
    newRows = []
    nt = len(min_Array)
    nexs = percentageOver/100
    for i in range(nt):
        for j in range(nexs):
            neigh = random.randint(1,k)
            difs = min_Array[neigh][0] - min_Array[i][0]
            newRec = (min_Array[i][0]+random.random()*difs)
            newRows.insert(0,(newRec))
    newData_rdd = sc.parallelize(newRows)
    newData_rdd_new = newData_rdd.map(lambda x: Row(features = x, label = 1))
    new_data = newData_rdd_new.toDF()
    new_data_minor = dataInput_min.unionAll(new_data)
    new_data_major = dataInput_maj.sample(False, (float(percentageUnder)/float(100)))
    return new_data_major.unionAll(new_data_minor)

dataInput = spark.read.format('csv').options(header='true',inferSchema='true').load("sam.csv").dropna()
SmoteSampling(vectorizerFunction(dataInput, 'Y'), k = 2, minorityClass = 1, majorityClass = 0, percentageOver = 90, percentageUnder = 5)

斯卡拉:

// Import the necessary packages
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.expressions.Window
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.functions._

object smoteClass
  def KNNCalculation(
    dataFinal:org.apache.spark.sql.DataFrame,
    feature:String,
    reqrows:Int,
    BucketLength:Int,
    NumHashTables:Int):org.apache.spark.sql.DataFrame = 
      val b1 = dataFinal.withColumn("index", row_number().over(Window.partitionBy("label").orderBy("label")))
      val brp = new BucketedRandomProjectionLSH().setBucketLength(BucketLength).setNumHashTables(NumHashTables).setInputCol(feature).setOutputCol("values")
      val model = brp.fit(b1)
      val transformedA = model.transform(b1)
      val transformedB = model.transform(b1)
      val b2 = model.approxSimilarityJoin(transformedA, transformedB, 2000000000.0)
      require(b2.count > reqrows, println("Change bucket lenght or reduce the percentageOver"))
      val b3 = b2.selectExpr("datasetA.index as id1",
        "datasetA.feature as k1",
        "datasetB.index as id2",
        "datasetB.feature as k2",
        "distCol").filter("distCol>0.0").orderBy("id1", "distCol").dropDuplicates().limit(reqrows)
      return b3
  

  def smoteCalc(key1: org.apache.spark.ml.linalg.Vector, key2: org.apache.spark.ml.linalg.Vector)=
    val resArray = Array(key1, key2)
    val res = key1.toArray.zip(key2.toArray.zip(key1.toArray).map(x => x._1 - x._2).map(_*0.2)).map(x => x._1 + x._2)
    resArray :+ org.apache.spark.ml.linalg.Vectors.dense(res)

  def Smote(
    inputFrame:org.apache.spark.sql.DataFrame,
    feature:String,
    label:String,
    percentOver:Int,
    BucketLength:Int,
    NumHashTables:Int):org.apache.spark.sql.DataFrame = 
      val groupedData = inputFrame.groupBy(label).count
      require(groupedData.count == 2, println("Only 2 labels allowed"))
      val classAll = groupedData.collect()
      val minorityclass = if (classAll(0)(1).toString.toInt > classAll(1)(1).toString.toInt) classAll(1)(0).toString else classAll(0)(0).toString
      val frame = inputFrame.select(feature,label).where(label + " == " + minorityclass)
      val rowCount = frame.count
      val reqrows = (rowCount * (percentOver/100)).toInt
      val md = udf(smoteCalc _)
      val b1 = KNNCalculation(frame, feature, reqrows, BucketLength, NumHashTables)
      val b2 = b1.withColumn("ndtata", md($"k1", $"k2")).select("ndtata")
      val b3 = b2.withColumn("AllFeatures", explode($"ndtata")).select("AllFeatures").dropDuplicates
      val b4 = b3.withColumn(label, lit(minorityclass).cast(frame.schema(1).dataType))
      return inputFrame.union(b4).dropDuplicates
  

Source

【讨论】:

github.com/alivcor/SMORK : 这个适用于原生 Transformer Spark API【参考方案2】:

也许这个项目对你的目标有用: Spark SMOTE

但我认为 115 条记录对于随机森林来说是不够的。您可以使用其他最简单的技术,例如决策树

你可以检查这个答案:

Is Random Forest suitable for very small data sets?

【讨论】:

github.com/alivcor/SMORK : 这个适用于原生 Transformer Spark API

以上是关于Pyspark 中的过采样或 SMOTE的主要内容,如果未能解决你的问题,请参考以下文章

分类问题中的过采样和欠采样

在AD和DA转换中的过采样和噪声形成

如何在 Pyspark 中对数据框进行过采样?

如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)

如何解决样本不均衡问题

pyspark:在日期和时间上重新采样 pyspark 数据帧