将特征的 Spark 向量转换为数组

Posted

技术标签:

【中文标题】将特征的 Spark 向量转换为数组【英文标题】:Convert a Spark Vector of features into an array 【发布时间】:2017-09-05 10:59:41 【问题描述】:

我有一个特征列,它使用 Spark 的 VectorAssembler 打包成一个向量向量,如下所示。 data 是输入 DataFrame(spark.sql.DataFrame 类型)。

val featureCols = Array("feature_1","feature_2","feature_3")
val featureAssembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val dataWithFeatures = featureAssembler.transform(data)

我正在使用 ClassifierClassificationModel 开发人员 API 开发自定义分类器。 ClassificationModel 需要开发一个 predictRaw() 函数,该函数从模型中输出预测标签的向量。

def predictRaw(features: FeaturesType) : Vector

此函数由 API 设置并接受参数,FeaturesType 的特征并输出一个向量(在我的情况下,我将其视为 Spark DenseVector,因为 DenseVector 扩展了 Vector 特征)。

由于 VectorAssembler 的打包,features 列的类型为 Vector,每个元素本身就是一个向量,包含每个训练样本的原始特征。例如:

特征列 - Vector 类型 [1.0, 2.0, 3.0] - element1,它本身就是一个向量 [3.5, 4.5, 5.5] - element2,它本身就是一个向量

我需要将这些特征提取到Array[Double] 中,以实现我的predictRaw() 逻辑。理想情况下,为了保持基数,我想要以下结果:

`val result: Array[Double] = Array(1.0, 3.5, 2.0, 4.5, 3.0, 4.5)` 

即以列优先顺序,因为我将把它变成一个矩阵。

我试过了:

val array = features.toArray // this gives an array of vectors and doesn't work

我还尝试将特征作为 DataFrame 对象而不是 Vector 输入,但由于从 VectorAssembler 打包特征,API 需要一个 Vector。例如,这个函数本身就可以工作,但不符合 API,因为它期望 FeaturesType 是 Vector 而不是 DataFrame:

def predictRaw(features: DataFrame) :DenseVector = 
  val featuresArray: Array[Double] = features.rdd.map(r => r.getAs[Vector](0).toArray).collect 
//rest of logic would go here

我的问题是features 的类型是Vector,而不是DataFrame。另一种选择可能是将features 打包为DataFrame,但如果不使用VectorAssembler,我不知道该怎么做。

感谢所有建议,谢谢!我看过Access element of a vector in a Spark DataFrame (Logistic Regression probability vector),但这是在python中,我正在使用Scala。

【问题讨论】:

【参考方案1】:

如果您只想将 DenseVector 转换为 Array[Double],使用 UDF 非常简单:

import org.apache.spark.ml.linalg.DenseVector
val toArr: Any => Array[Double] = _.asInstanceOf[DenseVector].toArray
val toArrUdf = udf(toArr)
val dataWithFeaturesArr = dataWithFeatures.withColumn("features_arr",toArrUdf('features))

这将为您提供一个新列:

|-- features_arr: array (nullable = true)
|    |-- element: double (containsNull = false)

【讨论】:

你好 - 我不确定这些是否真的在做我想要的。使用上面的 extract_features UDF,我似乎得到了与 features 列相同的列,如下所示:+--------------------+-------- ------------+ |特点|提取特征| +--------------------+--------+ |[-9.5357,0.016682. ..|[-9.5357, 0.01668...| +--------------------+--------+ 换句话说,特征列和提取的特征看起来完全一样。我可以像这样访问每个元素:仅显示前 1 行。如果我然后执行以下操作:val featuresArray1: Array[Double] = temp.rdd.map(r => r.getAs[Double](0)).collect(使用索引元素 1 和 2)- 会问另一个问题,因为空间不足 我认为问题在于 toArray 为每行提供了一个包含 3 个元素的数组,然后我很难访问这些元素。我要问一个单独的问题,这样就更清楚了。请看一下,谢谢我【参考方案2】:

Spark 3.0 添加了 vector_to_array UDF。无需自己实现https://github.com/apache/spark/pull/26910

import org.apache.spark.ml.linalg.SparseVector, Vector
import org.apache.spark.mllib.linalg.Vector => OldVector

private val vectorToArrayUdf = udf  vec: Any =>
    vec match 
      case v: Vector => v.toArray
      case v: OldVector => v.toArray
      case v => throw new IllegalArgumentException(
        "function vector_to_array requires a non-null input argument and input type must be " +
        "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
        s"but got $ if (v == null) "null" else v.getClass.getName .")
    
  .asNonNullable()

【讨论】:

【参考方案3】:

这是一种从数据帧(字符串,向量)获取数据报(字符串,数组)的方法(没有 udf)。主要思想是使用中间RDD转换为Vector,并使用其toArray方法:

val arrayDF = vectorDF.rdd
    .map(x => x.getAs[String](0) -> x.getAs[Vector](1).toArray)
    .toDF("word","array")

【讨论】:

【参考方案4】:

我的案例:word2vec之后的原始数据:

result.show(10,false)

+-------------+-----------------------------------------------------------------------------------------------------------+
|ip           |features                                                                                                   |
+-------------+-----------------------------------------------------------------------------------------------------------+
|1.1.125.120  |[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.11.114.150 |[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.116.114.36 |[0.022845590487122536,-0.012075710110366344,-0.034423209726810455,-0.04642726108431816,0.09164007753133774]|
|1.117.21.102 |[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.119.13.5   |[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.119.130.2  |[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.119.132.162|[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.119.133.166|[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.119.136.170|[0.0,0.0,0.0,0.0,0.0]                                                                                      |
|1.119.137.154|[0.0,0.0,0.0,0.0,0.0]                                                                                      |
+-------------+-----------------------------------------------------------------------------------------------------------+

我想删除嵌入零的 ip:

import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.linalg.Vector

val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic
val output = result.select($"ip",vecToSeq($"features").alias("features"))

val select_output = output.filter(output("features")!==Array(0,0,0,0,0))
select_output.show(5)


+-------------+--------------------+
|           ip|            features|
+-------------+--------------------+
| 1.116.114.36|[0.02284559048712...|
| 1.119.137.98|[-0.0244039318391...|
|1.119.177.102|[-0.0801128149032...|
|1.119.186.170|[0.01125990878790...|
|1.119.193.226|[0.04201301932334...|
+-------------+--------------------+

【讨论】:

以上是关于将特征的 Spark 向量转换为数组的主要内容,如果未能解决你的问题,请参考以下文章

如何将静态二维数组转换为特征值

如何将列转换为向量类型?

将包含 Vector 作为特征的 Spark 数据帧转换为 CSV 文件

如何将向量转换为数组

将数组转换为向量的最简单方法是啥?

将数组转换为向量的最简单方法是啥?