在 Spark 的 map 函数中运行 ML 算法

Posted

技术标签:

【中文标题】在 Spark 的 map 函数中运行 ML 算法【英文标题】:Run ML algorithm inside map function in Spark 【发布时间】:2017-04-15 16:13:45 【问题描述】:

所以我这几天一直在尝试在 Spark 的地图函数中运行 ML 算法。我发布了一个更具体的question,但引用 Spark 的 ML 算法给了我以下错误:

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

显然我不能在apply_classifier 函数中引用SparkContext。 我的代码类似于我在上一个问题中提出的建议,但仍未找到我正在寻找的解决方案:

def apply_classifier(clf):
    dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
    if clf == 0:
        clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
    elif clf == 1:
        clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5)

classifiers = [0, 1]

sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect() 

我尝试使用flatMap 代替map,但我得到了NoneType object is not iterable

我还想在apply_classifier 函数中传递一个广播数据集(它是一个DataFrame)作为参数。 最后,有可能做我想做的事吗?有哪些替代方案?

【问题讨论】:

【参考方案1】:

有可能做我想做的事吗?

事实并非如此。 Apache Spark 不支持任何形式的嵌套,分布式操作只能由驱动程序初始化。这包括访问分布式数据结构,例如 Spark DataFrame

有什么选择?

这取决于许多因素,例如数据的大小、可用资源的数量和算法的选择。一般来说,您有三种选择:

仅使用 Spark 作为任务管理工具来训练本地、非分布式模型。看起来你已经在某种程度上探索了这条路。有关此方法的更高级实现,您可以查看spark-sklearn

一般来说,当数据相对较小时,这种方法特别有用。它的优点是多个工作之间没有竞争。

使用标准多线程工具从单个上下文提交多个独立作业。例如,您可以使用 threadingjoblib

虽然这种方法是可行的,但我不建议在实践中使用它。并非所有 Spark 组件都是线程安全的,您必须非常小心以避免意外行为。它还使您几乎无法控制资源分配。

参数化您的 Spark 应用程序并使用外部管道管理器(Apache Airflow、Luigi、Toil)提交您的作业。

虽然这种方法有一些缺点(它需要将数据保存到持久存储中),但它也是最通用和最强大的,并且可以对资源分配进行大量控制。

【讨论】:

感谢您的回答。我会检查这些外部管道管理器!

以上是关于在 Spark 的 map 函数中运行 ML 算法的主要内容,如果未能解决你的问题,请参考以下文章

Spark ML下实现的多分类adaboost+naivebayes算法在文本分类上的应用

在 Spark 上训练 Kmeans 算法失败

深入理解Spark ML:基于ALS矩阵分解的协同过滤算法与源码分析

[Spark2.0]ML 调优:模型选择和超参数调优

[Spark2.0]ML 调优:模型选择和超参数调优

Apache Spark:多机器学习算法的并行化