sc.parallelize 不能在训练算法的 ML 管道中工作
Posted
技术标签:
【中文标题】sc.parallelize 不能在训练算法的 ML 管道中工作【英文标题】:sc.parallelize not working in the ML pipeline with the training algorithm 【发布时间】:2016-05-31 14:25:56 【问题描述】:使用 org.apache.spark.mllib 学习算法,我们过去在没有训练算法的情况下设置管道
var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler
val pipeline = new Pipeline().setStages(stages)
然后我们使用 LabeledPoint 为训练算法准备好数据,最后我们用类似的东西训练模型
val model = GradientBoostedTrees.train(sc.parallelize(trainingData.collect()), boostingStrategy)
我们必须注意,如果我们不使用 "sc.parallelize",训练似乎永远不会结束。
现在有了 org.apache.spark.ml 学习算法(由于 setLabelCol 和 setFeaturesCol),我们可以将训练算法也包含在管道中
val model = new GBTRegressor().setLabelCol(target_col_name).setFeaturesCol("features").setMaxIter(10)
var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler :+ model
val pipeline = new Pipeline().setStages(stages)
但是现在当我们传递数据时,它除了一个数据框,而不是 sc.parallelize 所做的数据行 所以下面的代码
val model = pipeline.fit(sc.parallelize(df_train))
抛出以下错误:
<console>:57: error: type mismatch;
found : org.apache.spark.sql.DataFrame
required: Seq[?]
此时
val model = pipeline.fit(df_train)
永无止境。
这个问题的解决方法是什么?
【问题讨论】:
使用数据框toDF
从 RDD 转换到数据框并返回非常容易 - 你只需要添加一个模式(如标题)。
@GameOfThrows:对不起,我没明白。 df_train 已经是一个 DataFrame。
【参考方案1】:
您的代码的主要问题是您将驱动程序用作数据的桥梁。即您正在收集所有分布式数据到您的驱动程序并将其传递回您的所有节点。另一个问题是您实际上正在使用ML
功能,这意味着您必须 使用DataFrame
s 而不是RDD
s。因此,您需要做的是将您的RDD
转换为DataFrame
。注意,有很多方法可以实现,可以查看How to convert RDD Object to DataFrame in Spark,还有一种方法是使用toDF方法。
【讨论】:
":type df_train" 给了我“org.apache.spark.sql.DataFrame”。所以上面的错误说“df_train”是一个DataFrame而不是一个RDD。对不起,如果我在这里遗漏了什么,你能帮我多一点吗? @Abhishek 是的,但是当您将其collect
它并行化后,您将其转换为 Seq
,然后再转换为 RDD
。
嘿,我明白了,但我仍然不确定如何解决这个问题,你能帮我提供代码建议吗?抱歉,我是 Scala 和 ML 算法的新手。
我仍然相信你没有得到我的问题,我怀疑是否将 sc.parallelize 与 ML 算法一起使用,该算法将 Dataframe 作为输入,而 sc.parallelize 将 RDD 作为输出。
是的,但是 ML 函数接收 DataFrames 并且只接收它们作为参数,因此您必须将 RDD 转换为 DataFrame,可能使用 toDF。以上是关于sc.parallelize 不能在训练算法的 ML 管道中工作的主要内容,如果未能解决你的问题,请参考以下文章