pyspark - 如何交叉验证几种 ML 算法

Posted

技术标签:

【中文标题】pyspark - 如何交叉验证几种 ML 算法【英文标题】:pyspark - how to cross validate several ML algorithms 【发布时间】:2018-04-22 05:28:32 【问题描述】:

我希望能够选择具有最佳参数的最佳拟合算法。 我怎样才能一次完成,而不为每个算法创建很少的管道,并且不对与特定算法无关的参数进行交叉验证检查? 即我想检查逻辑回归对随机森林的执行情况。 我的代码是:

    lr = LogisticRegression().setFamily("multinomial")
    # Chain indexer and tree in a Pipeline
    pipeline = Pipeline(stages=[labelIndexer,labelIndexer2, assembler, lr , labelconverter])

    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.1, 0.3, 0.01]) \
        .addGrid(lr.elasticNetParam, [0.1, 0.8, 0.01]) \
        .addGrid(lr.maxIter, [10, 20, 25]) \
        .build()

    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=RegressionEvaluator(),
                              numFolds=2)  # use 3+ folds in practice

    # Train model.  This also runs the indexer.
    model = crossval.fit(trainingData)

【问题讨论】:

你找到答案了吗?我也面临同样的问题。 【参考方案1】:

我在 Python/Pyspark 中编写了一个快速而肮脏的解决方法。它有点原始(它没有相应的 Scala 类),我认为它缺乏保存/加载功能,但它可能是您案例的起点。最终它可能会成为 Spark 中的一项新功能,拥有它会很不错。

这个想法是有一个特殊的管道阶段,作为不同对象之间的切换,并维护一个字典来用字符串引用它们。用户可以按名称启用一个或另一个。它们可以是 Estimator、Transformers 或两者兼而有之——用户负责保持管道中的一致性(做有意义的事情,风险自负)。带有启用阶段名称的参数可以包含在要交叉验证的网格中。

from pyspark.ml.wrapper import JavaEstimator
from pyspark.ml.base import Estimator, Transformer, Param, Params, TypeConverters

class PipelineStageChooser(JavaEstimator):
    
    selectedStage = Param(Params._dummy(), "selectedStage", "key of the selected stage in the dict",
                      typeConverter=TypeConverters.toString)

    stagesDict = None
    _paramMap = 

    def __init__(self, stagesDict, selectedStage):
        super(PipelineStageChooser, self).__init__()
        self.stagesDict = stagesDict
        if selectedStage not in self.stagesDict.keys():
            raise KeyError("selected stage 0 not found in stagesDict".format(selectedStage)) 

        if isinstance(self.stagesDict[selectedStage], Transformer):       
            self.fittedSelectedStage = self.stagesDict[selectedStage]

        for stage in stagesDict.values():
            if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)):
                raise TypeError("Cannot recognize a pipeline stage of type %s." % type(stage))     
        
        self._set(selectedStage=selectedStage)
        self._java_obj = None

    def fit(self, dataset, params=None): 
        selectedStage_str = self.getOrDefault(self.selectedStage)
        if isinstance(self.stagesDict[selectedStage_str], Estimator):
            return self.stagesDict[selectedStage_str].fit(dataset, params = params)
        elif isinstance(self.stagesDict[selectedStage_str], Transformer):
            return self.stagesDict[selectedStage_str]

使用示例:

count_vectorizer = CountVectorizer() # set params
hashing_tf = HashingTF() # set params
chooser = PipelineStageChooser(stagesDict="count_vectorizer": count_vectorizer, 
                                           "hashing_tf": hashing_tf,
                               selectedStage="count_vectorizer")

pipeline = Pipeline(stages = [chooser])

# Test which among CountVectorizer or HashingTF works better to create features 
# Could be used as well to decide between different ML algorithms
paramGrid = ParamGridBuilder() \
    .addGrid(chooser.selectedStage, ["count_vectorizer", "hashing_tf"])\
    .build()

【讨论】:

以上是关于pyspark - 如何交叉验证几种 ML 算法的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 线性回归梯度下降交叉验证

如何从 PySpark 中的 spark.ml 中提取模型超参数?

PySpark 中的分层交叉验证

PySpark ML:LinearSVC 的 OnevsRest 策略

Pyspark交叉验证后如何获得最佳超参数值?

交叉验证