火花火车测试拆分
Posted
技术标签:
【中文标题】火花火车测试拆分【英文标题】:Spark train test split 【发布时间】:2017-02-21 00:26:42 【问题描述】:我很好奇在最新的 2.0.1 版本中是否有类似于 sklearn 的 http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.StratifiedShuffleSplit.html 用于 apache-spark 的东西。
到目前为止,我只能找到https://spark.apache.org/docs/latest/mllib-statistics.html#stratified-sampling,这似乎不太适合将严重不平衡的数据集拆分为训练/测试样本。
【问题讨论】:
参见Example: model selection via train validation split TrainValidationSplit 创建单个(训练、测试)数据集对。它使用 trainRatio 参数将数据集分为这两部分。 谢谢。我不知道那个。但是, TrainValidationSplit 似乎既不是随机的,也不是支持已开始的拆分。我在这里错过了什么吗? 你说得对,有一张关于此 Support balanced class labels when splitting train/cross validation sets 的 Jira Ticket。所以Mllib还不支持这个功能 你知道在合并之前有一个体面的工作吗? 你已经看过这个answer了吗? 【参考方案1】:虽然这个答案不是特定于 Spark,但在 Apache Beam 中,我这样做是为了拆分 66% 的训练和 33% 的测试(只是一个说明性示例,您可以自定义下面的 partition_fn 以使其更复杂并接受诸如指定桶的数量或偏向某物的选择或确保随机化在各个维度上是公平的等):
raw_data = p | 'Read Data' >> Read(...)
clean_data = (raw_data
| "Clean Data" >> beam.ParDo(CleanFieldsFn())
def partition_fn(element):
return random.randint(0, 2)
random_buckets = (clean_data | beam.Partition(partition_fn, 3))
clean_train_data = ((random_buckets[0], random_buckets[1])
| beam.Flatten())
clean_eval_data = random_buckets[2]
【讨论】:
【参考方案2】:假设我们有一个这样的数据集:
+---+-----+
| id|label|
+---+-----+
| 0| 0.0|
| 1| 1.0|
| 2| 0.0|
| 3| 1.0|
| 4| 0.0|
| 5| 1.0|
| 6| 0.0|
| 7| 1.0|
| 8| 0.0|
| 9| 1.0|
+---+-----+
这个数据集是完美平衡的,但这种方法也适用于不平衡的数据。
现在,让我们用额外的信息来扩充这个 DataFrame,这些信息对于决定哪些行应该进入训练集很有用。步骤如下:
在给定一些ratio
的情况下,确定每个标签的多少示例应该是训练集的一部分。
随机排列 DataFrame 的行。
使用窗口函数按label
对DataFrame 进行分区和排序,然后使用row_number()
对每个标签的观察结果进行排名。
我们最终得到以下数据框:
+---+-----+----------+
| id|label|row_number|
+---+-----+----------+
| 6| 0.0| 1|
| 2| 0.0| 2|
| 0| 0.0| 3|
| 4| 0.0| 4|
| 8| 0.0| 5|
| 9| 1.0| 1|
| 5| 1.0| 2|
| 3| 1.0| 3|
| 1| 1.0| 4|
| 7| 1.0| 5|
+---+-----+----------+
注意:行被打乱(参见:id
列中的随机顺序),按标签分区(参见:label
列)并排名。
假设我们想要拆分 80%。在这种情况下,我们希望四个1.0
标签和四个0.0
标签用于训练数据集,一个1.0
标签和一个0.0
标签用于测试数据集。我们在row_number
列中有这些信息,所以现在我们可以简单地在用户定义的函数中使用它(如果row_number
小于或等于四,则示例转到训练集)。
应用UDF后,得到的数据框如下:
+---+-----+----------+----------+
| id|label|row_number|isTrainSet|
+---+-----+----------+----------+
| 6| 0.0| 1| true|
| 2| 0.0| 2| true|
| 0| 0.0| 3| true|
| 4| 0.0| 4| true|
| 8| 0.0| 5| false|
| 9| 1.0| 1| true|
| 5| 1.0| 2| true|
| 3| 1.0| 3| true|
| 1| 1.0| 4| true|
| 7| 1.0| 5| false|
+---+-----+----------+----------+
现在,要获得训练/测试数据,我们必须这样做:
val train = df.where(col("isTrainSet") === true)
val test = df.where(col("isTrainSet") === false)
对于一些非常大的数据集,这些排序和分区步骤可能会令人望而却步,因此我建议首先尽可能过滤数据集。实物图如下:
== Physical Plan ==
*(3) Project [id#4, label#5, row_number#11, if (isnull(row_number#11)) null else UDF(label#5, row_number#11) AS isTrainSet#48]
+- Window [row_number() windowspecdefinition(label#5, label#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#11], [label#5], [label#5 ASC NULLS FIRST]
+- *(2) Sort [label#5 ASC NULLS FIRST, label#5 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(label#5, 200)
+- *(1) Project [id#4, label#5]
+- *(1) Sort [_nondeterministic#9 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(_nondeterministic#9 ASC NULLS FIRST, 200)
+- LocalTableScan [id#4, label#5, _nondeterministic#9
这是完整的工作示例(使用 Spark 2.3.0 和 Scala 2.11.12 测试):
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame, Row, SparkSession
import org.apache.spark.sql.functions.col, row_number, udf, rand
class StratifiedTrainTestSplitter
def getNumExamplesPerClass(ss: SparkSession, label: String, trainRatio: Double)(df: DataFrame): Map[Double, Long] =
df.groupBy(label).count().createOrReplaceTempView("labelCounts")
val query = f"SELECT $label AS ratioLabel, count, cast(count * $trainRatio as long) AS trainExamples FROM labelCounts"
import ss.implicits._
ss.sql(query)
.select("ratioLabel", "trainExamples")
.map((r: Row) => r.getDouble(0) -> r.getLong(1))
.collect()
.toMap
def split(df: DataFrame, label: String, trainRatio: Double): DataFrame =
val w = Window.partitionBy(col(label)).orderBy(col(label))
val rowNumPartitioner = row_number().over(w)
val dfRowNum = df.sort(rand).select(col("*"), rowNumPartitioner as "row_number")
dfRowNum.show()
val observationsPerLabel: Map[Double, Long] = getNumExamplesPerClass(df.sparkSession, label, trainRatio)(df)
val addIsTrainColumn = udf((label: Double, rowNumber: Int) => rowNumber <= observationsPerLabel(label))
dfRowNum.withColumn("isTrainSet", addIsTrainColumn(col(label), col("row_number")))
object StratifiedTrainTestSplitter
def getDf(ss: SparkSession): DataFrame =
val data = Seq(
(0, 0.0), (1, 1.0), (2, 0.0), (3, 1.0), (4, 0.0), (5, 1.0), (6, 0.0), (7, 1.0), (8, 0.0), (9, 1.0)
)
ss.createDataFrame(data).toDF("id", "label")
def main(args: Array[String]): Unit =
val spark: SparkSession = SparkSession
.builder()
.config(new SparkConf().setMaster("local[1]"))
.getOrCreate()
val df = new StratifiedTrainTestSplitter().split(getDf(spark), "label", 0.8)
df.cache()
df.where(col("isTrainSet") === true).show()
df.where(col("isTrainSet") === false).show()
注意:在这种情况下,标签是Double
s。如果您的标签是 String
s,您将不得不在这里和那里切换类型。
【讨论】:
【参考方案3】:Spark 支持https://s3.amazonaws.com/sparksummit-share/ml-ams-1.0.1/6-sampling/scala/6-sampling_student.html 中所述的分层样本
df.stat.sampleBy("label", Map(0 -> .10, 1 -> .20, 2 -> .3), 0)
【讨论】:
如何使用 sampleBy 进行火车测试拆分。我的看法是,如果你有两行,train = df.stat.sameplyBy...
test=df.state.sameplBy...
那么样本可能有重复的记录。除非我错过了什么?
在得到train
和SampleBy
之后,leftanti
加入原来的df
到train
,你就有了test
@Jomonsugi 这很有帮助。我也发现 sampleBy 没有用,因为样本与拆分不同。但这是解决方案。我应该补充一点,如果您有一个准备好的数据集,那么您将需要添加一个键列来获得该连接。【参考方案4】:
当 OP 发布此问题时,此方法可能不可用,但我将其留在这里以供将来参考:
# splitting dataset into train and test set
train, test = df.randomSplit([0.7, 0.3], seed=42)
【讨论】:
但这不会像 OP 要求的那样分层,对吧?此外,元组中缺少逗号 (?) 是的,你让我到了那里。 谢谢!仍然注意 OP en.wikipedia.org/wiki/Stratified_sampling 要求的“分层”抽样(不是随机的!) 如果您打算使用 RandomSplit,不妨看看这篇文章:medium.com/udemy-engineering/…以上是关于火花火车测试拆分的主要内容,如果未能解决你的问题,请参考以下文章