使用 Spark 并行运行不同的分类器/算法

Posted

技术标签:

【中文标题】使用 Spark 并行运行不同的分类器/算法【英文标题】:Run different classifiers/algorithms in parallel using Spark 【发布时间】:2017-04-04 12:39:19 【问题描述】:

我有一个数据集,我想使用 Spark 和 Python 并行测试不同的分类器。 例如,如果我想测试决策树和随机森林,我该如何并行运行它们?

我尝试了一些方法,但我不断得到:

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我正在尝试这样做(使用 scikit-learn 的分类器而不是 Spark 的分类器效果很好:

def apply_classifier(clf, train_dataset, test_dataset):
    model = clf.fit(train_dataset)

    predictions = model.transform(test_dataset)

    evaluator = BinaryClassificationEvaluator()
    evaluator.evaluate(predictions)

    return [(model, predictions)]

...

dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

classifiers = [dt, rf]

sc.parallelize(classifiers).flatMap(lambda x: apply_classifier(x, train_dataset, test_dataset)).collect() 

关于如何做到这一点的任何建议?

谢谢!

【问题讨论】:

多模型评估器正在开发中 感谢您的回答@T.Gawęda 所以目前没有办法解决它吗? 我不这么认为。即使您将并行提交作业,它仍然会在集群管理器中排队。但是让我们等一下,也许有人会有一些经过测试的解决方法 - 我只是指出它目前不支持开箱即用,但会在不久的将来支持:) 【参考方案1】:

@larissa-leite

为了克服这个问题,我正在使用 [multiprocessing](https://docs.python.org/3/library/multiprocessing.html),就像在 thread 中解释的那样。

这是线程的代码:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

请解释我为什么使用它:我使用OneVsRestClassifier 训练了几个文本分类器模型(超过 200 个),并且我需要将收到的文本扩展到每个模型。

这里的延迟不到 200 毫秒就可以得到所有的预测(the baseline time reaction 对人类来说可能在 100 毫秒到 420 毫秒之间),所以这个“延迟”对我来说没什么大不了的。

【讨论】:

以上是关于使用 Spark 并行运行不同的分类器/算法的主要内容,如果未能解决你的问题,请参考以下文章

随机森林和adaboost对比

使用 Java 的图像分类算法

改进bagging,成为Boosting!

提升算法

历史上第一个机器学习算法是啥?

使用 Apache Spark 决策树分类器进行多类分类时出错