在 Spark 的 map 函数中运行 ML 算法
Posted
技术标签:
【中文标题】在 Spark 的 map 函数中运行 ML 算法【英文标题】:Run ML algorithm inside map function in Spark 【发布时间】:2017-04-15 16:13:45 【问题描述】:所以我这几天一直在尝试在 Spark 的地图函数中运行 ML 算法。我发布了一个更具体的question,但引用 Spark 的 ML 算法给了我以下错误:
AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?
显然我不能在apply_classifier
函数中引用SparkContext
。
我的代码类似于我在上一个问题中提出的建议,但仍未找到我正在寻找的解决方案:
def apply_classifier(clf):
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
if clf == 0:
clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
elif clf == 1:
clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5)
classifiers = [0, 1]
sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect()
我尝试使用flatMap
代替map
,但我得到了NoneType object is not iterable
。
我还想在apply_classifier
函数中传递一个广播数据集(它是一个DataFrame)作为参数。
最后,有可能做我想做的事吗?有哪些替代方案?
【问题讨论】:
【参考方案1】:有可能做我想做的事吗?
事实并非如此。 Apache Spark 不支持任何形式的嵌套,分布式操作只能由驱动程序初始化。这包括访问分布式数据结构,例如 Spark DataFrame
。
有什么选择?
这取决于许多因素,例如数据的大小、可用资源的数量和算法的选择。一般来说,您有三种选择:
仅使用 Spark 作为任务管理工具来训练本地、非分布式模型。看起来你已经在某种程度上探索了这条路。有关此方法的更高级实现,您可以查看spark-sklearn
。
一般来说,当数据相对较小时,这种方法特别有用。它的优点是多个工作之间没有竞争。
使用标准多线程工具从单个上下文提交多个独立作业。例如,您可以使用 threading
或 joblib
。
虽然这种方法是可行的,但我不建议在实践中使用它。并非所有 Spark 组件都是线程安全的,您必须非常小心以避免意外行为。它还使您几乎无法控制资源分配。
参数化您的 Spark 应用程序并使用外部管道管理器(Apache Airflow、Luigi、Toil)提交您的作业。
虽然这种方法有一些缺点(它需要将数据保存到持久存储中),但它也是最通用和最强大的,并且可以对资源分配进行大量控制。
【讨论】:
感谢您的回答。我会检查这些外部管道管理器!以上是关于在 Spark 的 map 函数中运行 ML 算法的主要内容,如果未能解决你的问题,请参考以下文章
Spark ML下实现的多分类adaboost+naivebayes算法在文本分类上的应用