如何将 spark DataFrame 转换为 RDD mllib LabeledPoints?
Posted
技术标签:
【中文标题】如何将 spark DataFrame 转换为 RDD mllib LabeledPoints?【英文标题】:How to convert spark DataFrame to RDD mllib LabeledPoints? 【发布时间】:2016-06-28 06:35:05 【问题描述】:我尝试将 PCA 应用于我的数据,然后将 RandomForest 应用于转换后的数据。但是,PCA.transform(data) 给了我一个 DataFrame,但我需要一个 mllib LabeledPoints 来喂我的 RandomForest。我怎样才能做到这一点? 我的代码:
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.ml.feature.PCA
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
val dataset = MLUtils.loadLibSVMFile(sc, "data/mnist/mnist.bz2")
val splits = dataset.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
val trainingDf = trainingData.toDF()
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(100)
.fit(trainingDf)
val pcaTrainingData = pca.transform(trainingDf)
val numClasses = 10
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 20
val maxBins = 32
val model = RandomForest.trainClassifier(pcaTrainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
error: type mismatch;
found : org.apache.spark.sql.DataFrame
required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]
我尝试了以下两种可能的解决方案,但没有奏效:
scala> val pcaTrainingData = trainingData.map(p => p.copy(features = pca.transform(p.features)))
<console>:39: error: overloaded method value transform with alternatives:
(dataset: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame <and>
(dataset: org.apache.spark.sql.DataFrame,paramMap: org.apache.spark.ml.param.ParamMap)org.apache.spark.sql.DataFrame <and>
(dataset: org.apache.spark.sql.DataFrame,firstParamPair: org.apache.spark.ml.param.ParamPair[_],otherParamPairs: org.apache.spark.ml.param.ParamPair[_]*)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.mllib.linalg.Vector)
还有:
val labeled = pca
.transform(trainingDf)
.map(row => LabeledPoint(row.getDouble(0), row(4).asInstanceOf[Vector[Int]]))
error: type mismatch;
found : scala.collection.immutable.Vector[Int]
required: org.apache.spark.mllib.linalg.Vector
(在上述情况下我已经导入了 org.apache.spark.mllib.linalg.Vectors)
有什么帮助吗?
【问题讨论】:
您的代码对我来说工作得很好(原样,没有两次解决方案尝试)。我猜也许你有一个进口错误?我正在使用import org.apache.spark.ml.feature.PCA
、import org.apache.spark.mllib.util.MLUtils
。我用这个文件运行它:csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/…
@TzachZohar 哦,我有和你一样的导入,我通过添加它们来编辑我的问题。我也使用了相同的数据文件。是不是因为我在 shell 中运行而不是 spark-submit 所以它不起作用?
为什么所有的反对票?似乎是一个合理的问题。
【参考方案1】:
这里的正确方法是您尝试的第二种方法 - 将每个 Row
映射到 LabeledPoint
以获得 RDD[LabeledPoint]
。但是,它有两个错误:
-
正确的
Vector
类 (org.apache.spark.mllib.linalg.Vector
) 不采用类型参数(例如 Vector[Int]
) - 因此,即使您有正确的导入,编译器仍会得出结论,您的意思是 scala.collection.immutable.Vector
确实如此。
PCA.fit()
返回的 DataFrame 有 3 列,您尝试提取第 4 列。例如,显示前 4 行:
+-----+--------------------+--------------------+
|label| features| pcaFeatures|
+-----+--------------------+--------------------+
| 5.0|(780,[152,153,154...|[880.071111851977...|
| 1.0|(780,[158,159,160...|[-41.473039034112...|
| 2.0|(780,[155,156,157...|[931.444898405036...|
| 1.0|(780,[124,125,126...|[25.5114585648411...|
+-----+--------------------+--------------------+
为了使这更容易 - 我更喜欢使用列 names 而不是它们的索引。
所以这是你需要的转换:
val labeled = pca.transform(trainingDf).rdd.map(row => LabeledPoint(
row.getAs[Double]("label"),
row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures")
))
【讨论】:
以上是关于如何将 spark DataFrame 转换为 RDD mllib LabeledPoints?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 spark DataFrame 转换为 RDD mllib LabeledPoints?
如何将 BigQuery SQL 查询结果转换为 Spark DataFrame?
如何将 Scala Spark Dataframe 转换为 LinkedHashMap[String, String]
如何将 Spark Dataframe 列转换为字符串数组的单列