将稀疏特征向量分解为单独的列

Posted

技术标签:

【中文标题】将稀疏特征向量分解为单独的列【英文标题】:Explode sparse features vector into separate columns 【发布时间】:2018-01-30 14:43:15 【问题描述】:

在我的 spark DataFrame 中,我有一列包含 CountVectoriser 转换的输出 - 它是稀疏向量格式。我想要做的是再次将此列“分解”成一个密集向量,然后它是组件行(以便它可以用于外部模型的评分)。

我知道该列中有 40 个特征,因此按照this 示例,我尝试了:

import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector

// convert sparse vector to a dense vector, and then to array<double> 
val vecToSeq = udf((v: Vector) => v.toArray)

// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)

但是,我收到了奇怪的错误(请参阅下面的完整错误):

data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;

现在看来,也许 CountVectoriser 创建了一个“ml.linalg.Vector”类型的向量,所以我也尝试过导入:

import org.apache.spark.ml.linalg.Vector, DenseVector, SparseVector

然后我得到一个错误原因:

Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row

我还尝试通过将 UDF 更改为来转换 ml 向量:

val vecToSeq = udf((v: Vector) =>  org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )

并得到类似的cannot be cast to org.apache.spark.sql.Row 错误。谁能告诉我为什么这不起作用?有没有更简单的方法将 DataFrame 中的稀疏向量分解为单独的列?我在这上面花了好几个小时都搞不明白。

编辑:模式将特征列显示为向量:

  |-- features: vector (nullable = true)

完整的错误跟踪:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . . 
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d

        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
        at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
        at 

【问题讨论】:

你能显示架构吗? categorisation_split_vec 是什么? 添加了架构。 categorisation_split_vecfeatures 列的实际名称,为了简单起见,我将其重命名(虽然不是到处 - 现在已修复) 【参考方案1】:

在处理此类案例时,我经常会一步一步分解,以了解问题出在哪里。

首先,让我们设置一个数据框:

import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.linalg.Vector
val df=sc.parallelize(Seq((1L, Seq("word1", "word2")))).toDF("id", "words")
val countModel = new CountVectorizer().setInputCol("words").setOutputCol("feature").fit(df)
val testDF = countModel.transform(df)
testDF.show

+---+--------------+-------------------+
| id|         words|            feature|
+---+--------------+-------------------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|
+---+--------------+-------------------+

现在,我要做的是选择,比如说特征的第一列,也就是提取feature向量的第一个坐标。

这可以写成:v(0)。 现在我希望我的数据框有一个包含v(0) 的列,其中vfeature 列的内容。我可以为此使用用户定义函数:

val firstColumnExtractor = udf((v: Vector) => v(0))

我尝试将此列添加到我的testDF

testDF.withColumn("feature_0", firstColumnExtractor($"feature")).show
+---+--------------+-------------------+---------+                              
| id|         words|            feature|feature_0|
+---+--------------+-------------------+---------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|      1.0|
+---+--------------+-------------------+---------+

请注意,我也可以这样做(据我所知,这只是风格问题):

testDF.select(firstColumnExtractor($"feature").as("feature_0")).show

这行得通,但要重复很多工作。让我们自动化。 首先,我可以概括提取函数以在任何索引处工作。让我们创建一个高阶函数(创建函数的函数)

def columnExtractor(idx: Int) = udf((v: Vector) => v(idx))

现在,我可以重写前面的例子了:

testDF.withColumn("feature_0", columnExtractor(0)($"feature")).show

好的,现在我可以这样做了:

testDF.withColumn("feature_0", columnExtractor(0)($"feature"))
      .withColumn("feature_1", columnExtractor(1)($"feature"))

这适用于 1,但是 39 维呢?好吧,让我们再自动化一些。以上确实是每个维度上的foldLeft 操作:

(0 to 39).foldLeft(testDF)((df, idx) => df.withColumn("feature_"+idx, columnExtractor(idx)($"feature")))

这只是编写多选函数的另一种方式

val featureCols = (0 to 1).map(idx => columnExtractor(idx)($"feature").as("feature_"+idx))
testDF.select((col("*") +: featureCols):_*).show
+---+--------------+-------------------+---------+---------+
| id|         words|            feature|feature_0|feature_1|
+---+--------------+-------------------+---------+---------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|      1.0|      1.0|
+---+--------------+-------------------+---------+---------+

现在,出于性能原因,您可能希望将基本 Vector 转换为坐标数组(或 DenseVector)。随意这样做。我觉得DenseVectorArray 在性能方面可能非常接近,所以我会这样写:

// A function to densify the feature vector
val toDense = udf((v:Vector) => v.toDense)
// Replase testDF's feature column with its dense equivalent
val denseDF = testDF.withColumn("feature", toDense($"feature"))
// Work on denseDF as we did on testDF 
denseDF.select((col("*") +: featureCols):_*).show

【讨论】:

这是一个绝妙的答案——它确实阐明了语法。现在,我可以使用示例 DF 来按照您的示例进行操作,并且它似乎可以正常工作。但是当我使用我的 DF 时,即使我运行你的第一个 UDF,我也会收到错误:Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row - 与我一直遇到的相同类型的错误。 我在尝试许多不同的方法时都遇到了同样的错误 - 它让我不知所措,因为我不知道如何处理它! 您能否打印您的数据框的架构对象(例如testDF.schema("features")),并将其添加到您的问题中以供将来参考?我怀疑周围有不同类型的 Vector 对象,看到您也在使用 Cassandra 连接器。 架构是一个普通的|-- features: vector (nullable = true) 我的 DF 的 df.schema("features") 输出是 StructField(feature,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true)- 与 testDF 的输出完全相同。这使得这段代码在一个而不是另一个上工作变得更加陌生。【参考方案2】:

您的导入语句似乎有问题。如您所见,CountVectorizer 将使用 ml 包向量,因此,所有向量导入也应使用此包。确保您没有使用旧的mllib 进行任何导入。这包括:

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector

有些方法只存在于mllib 包中,因此如果您确实需要使用这种类型的向量,您可以重命名它们(因为名称与ml 向量相同)。例如:

import org.apache.spark.mllib.linalg.Vector => mllibVector

修复所有导入后,您的代码应该可以运行。测试:

val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)

将给出如下测试数据框:

+---+--------------------+--------------------+
| id|               words|            features|
+---+--------------------+--------------------+
|  1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
|  2|      [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+

现在给每个索引它自己的列:

val vecToSeq = udf((v: Vector) => v.toArray)

val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)

结果数据Ffame:

+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
|          1.0|          0.0|          1.0|          1.0|
|          1.0|          1.0|          0.0|          0.0|
+-------------+-------------+-------------+-------------+

【讨论】:

谢谢。这应该有效,这几乎是我一直在尝试的。当我在 testDF 上运行它时效果很好,但在我的真实 DF 上,它具有由 CountVectoriser 创建的相同的 sparseVector 列,我仍然得到错误(Failed to execute user defined function(anonfun$1: (vector) =&gt; array&lt;double&gt;) . . . Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row)。两个 DF 的 df.struct("features") 相同 (StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true))。我不知道为什么? 另外:我的所有导入都很好 - 错误似乎来自最初的 vecToSeq 函数和数据格式不匹配。 @renegademonkey:真实的数据框是什么样的?您可以将其添加到问题中吗? 它太大了(40 个特征与整数、时间戳、双精度、字符串、数组的组合),但这不应该相关吗?如果我将整个 DF 过滤到仅features 列并尝试在其上运行vecToSeq,则会发生相同的错误。 @renegademonkey:features 列的样本就足够了。如果您说该列的 5 行并运行 vecToSeq,它仍然会给出错误吗?

以上是关于将稀疏特征向量分解为单独的列的主要内容,如果未能解决你的问题,请参考以下文章

如何将我的索引向量更改为可在 sklearn 中使用的稀疏特征向量?

特征分解 奇异值分解

对应不同特征值的两个特征向量的乘积等于0,是这样吗?

eigenface 怎样进行人脸识别

如何求稀疏矩阵的全部特征值和特征向量?

电力负荷预测基于matlab日特征气象因素支持向量机SVM电力负荷预测含Matlab源码 1612期