使用 Spark 并行运行不同的分类器/算法
Posted
技术标签:
【中文标题】使用 Spark 并行运行不同的分类器/算法【英文标题】:Run different classifiers/algorithms in parallel using Spark 【发布时间】:2017-04-04 12:39:19 【问题描述】:我有一个数据集,我想使用 Spark 和 Python 并行测试不同的分类器。 例如,如果我想测试决策树和随机森林,我该如何并行运行它们?
我尝试了一些方法,但我不断得到:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我正在尝试这样做(使用 scikit-learn 的分类器而不是 Spark 的分类器效果很好:
def apply_classifier(clf, train_dataset, test_dataset):
model = clf.fit(train_dataset)
predictions = model.transform(test_dataset)
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)
return [(model, predictions)]
...
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
classifiers = [dt, rf]
sc.parallelize(classifiers).flatMap(lambda x: apply_classifier(x, train_dataset, test_dataset)).collect()
关于如何做到这一点的任何建议?
谢谢!
【问题讨论】:
多模型评估器正在开发中 感谢您的回答@T.Gawęda 所以目前没有办法解决它吗? 我不这么认为。即使您将并行提交作业,它仍然会在集群管理器中排队。但是让我们等一下,也许有人会有一些经过测试的解决方法 - 我只是指出它目前不支持开箱即用,但会在不久的将来支持:) 【参考方案1】:@larissa-leite
为了克服这个问题,我正在使用 [multiprocessing](https://docs.python.org/3/library/multiprocessing.html)
,就像在 thread 中解释的那样。
这是线程的代码:
from multiprocessing import Process
def func1():
print 'func1: starting'
for i in xrange(10000000): pass
print 'func1: finishing'
def func2():
print 'func2: starting'
for i in xrange(10000000): pass
print 'func2: finishing'
if __name__ == '__main__':
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
请解释我为什么使用它:我使用OneVsRestClassifier 训练了几个文本分类器模型(超过 200 个),并且我需要将收到的文本扩展到每个模型。
这里的延迟不到 200 毫秒就可以得到所有的预测(the baseline time reaction 对人类来说可能在 100 毫秒到 420 毫秒之间),所以这个“延迟”对我来说没什么大不了的。
【讨论】:
以上是关于使用 Spark 并行运行不同的分类器/算法的主要内容,如果未能解决你的问题,请参考以下文章