DataFrame 使用 scala 在火花中轻风 DenseMatrix
Posted
技术标签:
【中文标题】DataFrame 使用 scala 在火花中轻风 DenseMatrix【英文标题】:DataFrame to breeze DenseMatrix in spark using scala 【发布时间】:2019-04-04 15:05:13 【问题描述】:我正在尝试使用 scala 将数据帧转换为微风密集矩阵。我找不到任何内置函数来执行此操作,所以这就是我正在做的事情。
import scala.util.Random
import breeze.linalg.DenseMatrix
val featuresDF = (1 to 10)
.map(_ => (
Random.nextDouble,Random.nextDouble,Random.nextDouble))
.toDF("F1", "F2", "F3")
var FeatureArray: Array[Array[Double]] = Array.empty
val features = featuresDF.columns
for(i <- features.indices)
FeatureArray = FeatureArray :+ featuresDF.select(features(i)).collect.map(_(0).toString).map(_.toDouble)
val desnseMat = DenseMatrix(FeatureArray: _*).t
这确实很好,我得到了我想要的。但是,这会在我的环境中导致 OOM 异常。有没有更好的方法来进行这种转换。我的最终目标是使用密集矩阵计算特征的特征值和特征向量。
import breeze.stats.covmat
import breeze.linalg.eig
val covariance = covmat(desnseMat)
val eigen = eig(covariance)
因此,如果有一种直接的方法可以从数据框中获取特征值和特征向量,那就更好了。 spark ml 中的 PCA 必须使用 features 列进行此计算。有没有办法通过 PCA 访问特征值?
【问题讨论】:
【参考方案1】:首先,尝试增加你的内存。
其次,使用 Spark 中的 DenseMatrix 尝试这些功能之一。 这两个函数在我的计算机上使用相同数量的 RAM。
我获得了 1.34 秒来解析 DataFrame 中的 201238 行,其中 1 列每列包含多个 Double 值:
import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.DataFrame
def getDenseMatrixFromDF(featuresDF:DataFrame):DenseMatrix =
val featuresTrain = featuresDF.columns
val rows = featuresDF.count().toInt
val newFeatureArray:Array[Double] = featuresTrain
.indices
.flatMap(i => featuresDF
.select(featuresTrain(i))
.collect())
.map(r => r.toSeq.toArray).toArray.flatten.flatMap(_.asInstanceOf[org.apache.spark.ml.linalg.DenseVector].values)
val newCols = newFeatureArray.length / rows
val denseMat:DenseMatrix = new DenseMatrix(rows, newCols, newFeatureArray, isTransposed=false)
denseMat
如果我想从 DataFrame 中获取 DenseVector,其中一列仅包含一个 Double 值,那么对于相同数量的数据,我得到了 0.8 秒:
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.DataFrame
def getDenseVectorFromDF(featuresDF:DataFrame):DenseVector =
val featuresTrain = featuresDF.columns
val cols = featuresDF.columns.length
cols match
case i if i>1 => throw new IllegalArgumentException
case _ =>
def addArray(acc:Array[Array[Double]],cur:Array[Double]):Array[Array[Double]] =
acc :+ cur
val newFeatureArray:Array[Double] = featuresTrain
.indices
.flatMap(i => featuresDF
.select(featuresTrain(i))
.collect())
.map(r => r.toSeq.toArray.map(e => e.asInstanceOf[Double])).toArray.flatten
val denseVec:DenseVector = new DenseVector(newFeatureArray)
denseVec
要计算特征值/特征向量,只需检查 this link 和 this API link
计算协方差矩阵检查this link和this API link
【讨论】:
【参考方案2】:def getDenseMatrixFromDF(featuresDF:DataFrame):BDM[Double] =
val featuresTrain = featuresDF.columns
val cols = featuresTrain.length
val rows = featuresDF.count().toInt
val denseMat: BDM[Double] = BDM.tabulate(rows,cols)((i, j)=>
featuresDF.collect().apply(i).getAs[Double](j)
)
denseMat
【讨论】:
请解释您的解决方案,而不是仅仅分享代码以上是关于DataFrame 使用 scala 在火花中轻风 DenseMatrix的主要内容,如果未能解决你的问题,请参考以下文章