Apache Spark StringIndexer 应用不存在的标签(看不见的标签异常)

Posted

技术标签:

【中文标题】Apache Spark StringIndexer 应用不存在的标签(看不见的标签异常)【英文标题】:Apache Spark StringIndexer applies non-existent labels (Unseen label Exception) 【发布时间】:2019-07-14 02:16:30 【问题描述】:

我正在尝试使用 PySpark 2.3.0 进行随机森林分类。我的数据集包含三列字符串,因此我使用 StringIndexer 将它们转换为数字。不幸的是,在评估过程中,索引器突然发现数据集中任何地方都不存在的标签。

这是我的数据集的摘录(最后一列是标签 0/1):

Year,Month,DayofMonth,DayOfWeek,DepTime,UniqueCarrier,Origin,Dest,Distance,DepDelay15Min
2004,1,12,1,623,UA,ORD,CLT,599,0
2004,1,13,2,621,UA,ORD,CLT,599,0
2004,1,14,3,633,UA,ORD,CLT,599,0

这是我的脚本:

CSV_PATH = "data/mllib/2004_10000_small.csv"
APP_NAME = "Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 13579
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 10
RF_MAX_DEPTH = 30
RF_MAX_BINS = 2048
LABEL = "DepDelay15Min"
CATEGORICAL_FEATURES = ["UniqueCarrier", "Origin", "Dest"]

from pyspark import SparkContext
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import SparkSession
from time import *

# Creates Spark Session
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# Reads in CSV file as DataFrame
# header: The first line of files are used to name columns and are not included in data. All types are assumed to be string.
# inferSchema: Automatically infer column types. It requires one extra pass over the data.
df = spark.read.options(header = "true", inferschema = "true").csv(CSV_PATH)

# Transforms all strings into indexed numbers
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in CATEGORICAL_FEATURES]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Removes old string columns
df = df.drop(*CATEGORICAL_FEATURES)

# Moves the label to the last column
df = StringIndexer(inputCol=LABEL, outputCol=LABEL+"_label").fit(df).transform(df)
df = df.drop(LABEL)

# Converts the DataFrame into a LabeledPoint Dataset with the last column being the label and the rest the features.
transformed_df = df.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))

# Splits the dataset into a training and testing set according to the defined ratio using the defined random seed.
splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data, test_data = transformed_df.randomSplit(splits, RANDOM_SEED)

print("Number of training set rows: %d" % training_data.count())
print("Number of test set rows: %d" % test_data.count())

# Run algorithm and measure runtime
start_time = time()

model = RandomForest.trainClassifier(training_data, numClasses=2, categoricalFeaturesInfo=, numTrees=RF_NUM_TREES, featureSubsetStrategy="auto", impurity="gini", maxDepth=RF_MAX_DEPTH, maxBins=RF_MAX_BINS, seed=RANDOM_SEED)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

# Make predictions and compute accuracy
predictions = model.predict(test_data.map(lambda x: x.features))
labels_and_predictions = test_data.map(lambda x: x.label).zip(predictions)
acc = labels_and_predictions.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
print("Model accuracy: %.3f%%" % (acc * 100))

在最后执行 labels_and_predictions.filter() 时,我收到以下错误消息:

Caused by: org.apache.spark.SparkException: Unseen label: OR.  To handle unseen labels, set Param handleInvalid to keep.
        at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:260)

但是,数据集中的任何地方都不存在标签“OR”,只有“ORD”。我尝试了不同的数据集,结果发现 Spark 不断切断“Origin”行的最后一个字母。我一点也不知道脚本的哪一部分可能对此负责。任何想法我应该如何进行调查?谢谢并提前!

【问题讨论】:

我不确定是什么原因造成的,但我怀疑它可能是由 LabeledPoint 数据集转换引起的。您不使用 Spark.ml 库和 VectorAssembler 是否有原因? @Erik 除了我不知道它可以以不同的方式处理之外,没有什么特别的原因。 VectorAssembler 与 LabeledPoint 转换相比有什么优势?不使用 Spark.ml 是什么意思?我认为这正是我正在做的事情(或者至少 pyspark.ml 在下面调用 spark.ml)?你能指出一个使用 VectorAssembler 的例子吗?谢谢! 看起来您使用的是 pyspark.mllib 而不是 pyspark.ml。 MLLib 是基于 RDD 的 ML 库,而 ML 是基于 Dataframe 的 ML 库。您可以使用 pyspark.ml 然后使用 DataFrames,而不是尝试将所有内容都放入 LabeledPoint 转换并删除所有中间列。优点是您不必使用 RDD(通常比 Dataframes 慢并且更容易出错。)您也可以在 Pipeline 中使用它们 - 这将使评分更容易。见spark.apache.org/docs/latest/ml-features.html#vectorassembler @Erik 非常感谢您的建议!我仍然不明白为什么我的解决方案不起作用,但是在将它移植到 ML 库之后它工作得很好。我也会在这里发布我的新脚本。 【参考方案1】:

正如 Erik 指出的那样,我使用的是过时的 MLLib 而不是 ML 库。我仍然不明白为什么原始脚本无法正常工作,但在将其移植到 ML 之后却可以。以下是受此示例启发的新解决方案:https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier

CSV_PATH = "data/mllib/2004_10000_small.csv"
APP_NAME = "Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 13579
TRAININGDATA_RATIO = 0.7
VI_MAX_CATEGORIES = 4
RF_NUM_TREES = 10
RF_MAX_DEPTH = 30
RF_MAX_BINS = 2048
LABEL = "DepDelay15Min"
CATEGORICAL_FEATURES = ["UniqueCarrier", "Origin", "Dest"]

from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.sql import SparkSession
from time import *

# Creates Spark Session
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# Reads in CSV file as DataFrame
# header: The first line of files are used to name columns and are not included in data. All types are assumed to be string.
# inferSchema: Automatically infer column types. It requires one extra pass over the data.
data = spark.read.options(header = "true", inferschema = "true").csv(CSV_PATH)

# Transforms all string features into indexed numbers
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in CATEGORICAL_FEATURES]
pipeline = Pipeline(stages=indexers)
data = pipeline.fit(data).transform(data)

# Removes old string columns
data = data.drop(*CATEGORICAL_FEATURES)

# Indexes the label and moves it to the last column
data = StringIndexer(inputCol=LABEL, outputCol="label").fit(data).transform(data)
data = data.drop(LABEL)

# Assembles all feature columns and moves them to the last column
assembler = VectorAssembler(inputCols=data.columns[0:-1], outputCol="features")
data = assembler.transform(data)

# Remove all columns but label and features
data = data.drop(*data.columns[0:-2])

# Splits the dataset into a training and testing set according to the defined ratio using the defined random seed.
splits = [TRAININGDATA_RATIO, 1.0 - TRAININGDATA_RATIO]
trainingData, testData = data.randomSplit(splits, RANDOM_SEED)

print("Number of training set rows: %d" % trainingData.count())
print("Number of test set rows: %d" % testData.count())

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > VI_MAX_CATEGORIES distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedataeatures", maxCategories=VI_MAX_CATEGORIES).fit(data)

# Train a RandomForest model.
randomForest = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedataeatures", numTrees=RF_NUM_TREES, maxBins=RF_MAX_BINS)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, randomForest, labelConverter])

# Train model.  This also runs the indexers. Measures the execution time as well.
start_time = time()
model = pipeline.fit(trainingData)
end_time = time()

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

【讨论】:

以上是关于Apache Spark StringIndexer 应用不存在的标签(看不见的标签异常)的主要内容,如果未能解决你的问题,请参考以下文章

Apache-Spark 作为日志存储

值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员

Spark 错误 - 值文本文件不是 org.apache.spark.sparkcontext 的成员

Apache Spark :org.apache.spark.sql.Dataset.drop(String... colNames) 方法用于 Java

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedul(

apache storm apache spark哪个更火