将特征的 Spark 向量转换为数组
Posted
技术标签:
【中文标题】将特征的 Spark 向量转换为数组【英文标题】:Convert a Spark Vector of features into an array 【发布时间】:2017-09-05 10:59:41 【问题描述】:我有一个特征列,它使用 Spark 的 VectorAssembler 打包成一个向量向量,如下所示。 data
是输入 DataFrame(spark.sql.DataFrame
类型)。
val featureCols = Array("feature_1","feature_2","feature_3")
val featureAssembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val dataWithFeatures = featureAssembler.transform(data)
我正在使用 Classifier
和 ClassificationModel
开发人员 API 开发自定义分类器。 ClassificationModel
需要开发一个 predictRaw()
函数,该函数从模型中输出预测标签的向量。
def predictRaw(features: FeaturesType) : Vector
此函数由 API 设置并接受参数,FeaturesType
的特征并输出一个向量(在我的情况下,我将其视为 Spark DenseVector
,因为 DenseVector
扩展了 Vector
特征)。
由于 VectorAssembler 的打包,features
列的类型为 Vector
,每个元素本身就是一个向量,包含每个训练样本的原始特征。例如:
特征列 - Vector 类型 [1.0, 2.0, 3.0] - element1,它本身就是一个向量 [3.5, 4.5, 5.5] - element2,它本身就是一个向量
我需要将这些特征提取到Array[Double]
中,以实现我的predictRaw()
逻辑。理想情况下,为了保持基数,我想要以下结果:
`val result: Array[Double] = Array(1.0, 3.5, 2.0, 4.5, 3.0, 4.5)`
即以列优先顺序,因为我将把它变成一个矩阵。
我试过了:
val array = features.toArray // this gives an array of vectors and doesn't work
我还尝试将特征作为 DataFrame 对象而不是 Vector 输入,但由于从 VectorAssembler 打包特征,API 需要一个 Vector。例如,这个函数本身就可以工作,但不符合 API,因为它期望 FeaturesType 是 Vector 而不是 DataFrame:
def predictRaw(features: DataFrame) :DenseVector =
val featuresArray: Array[Double] = features.rdd.map(r => r.getAs[Vector](0).toArray).collect
//rest of logic would go here
我的问题是features
的类型是Vector
,而不是DataFrame
。另一种选择可能是将features
打包为DataFrame
,但如果不使用VectorAssembler
,我不知道该怎么做。
感谢所有建议,谢谢!我看过Access element of a vector in a Spark DataFrame (Logistic Regression probability vector),但这是在python中,我正在使用Scala。
【问题讨论】:
【参考方案1】:如果您只想将 DenseVector 转换为 Array[Double],使用 UDF 非常简单:
import org.apache.spark.ml.linalg.DenseVector
val toArr: Any => Array[Double] = _.asInstanceOf[DenseVector].toArray
val toArrUdf = udf(toArr)
val dataWithFeaturesArr = dataWithFeatures.withColumn("features_arr",toArrUdf('features))
这将为您提供一个新列:
|-- features_arr: array (nullable = true)
| |-- element: double (containsNull = false)
【讨论】:
你好 - 我不确定这些是否真的在做我想要的。使用上面的 extract_features UDF,我似乎得到了与 features 列相同的列,如下所示:+--------------------+-------- ------------+ |特点|提取特征| +--------------------+--------+ |[-9.5357,0.016682. ..|[-9.5357, 0.01668...| +--------------------+--------+ 换句话说,特征列和提取的特征看起来完全一样。我可以像这样访问每个元素:仅显示前 1 行。如果我然后执行以下操作:val featuresArray1: Array[Double] = temp.rdd.map(r => r.getAs[Double](0)).collect
(使用索引元素 1 和 2)- 会问另一个问题,因为空间不足
我认为问题在于 toArray 为每行提供了一个包含 3 个元素的数组,然后我很难访问这些元素。我要问一个单独的问题,这样就更清楚了。请看一下,谢谢我【参考方案2】:
Spark 3.0 添加了 vector_to_array UDF。无需自己实现https://github.com/apache/spark/pull/26910
import org.apache.spark.ml.linalg.SparseVector, Vector
import org.apache.spark.mllib.linalg.Vector => OldVector
private val vectorToArrayUdf = udf vec: Any =>
vec match
case v: Vector => v.toArray
case v: OldVector => v.toArray
case v => throw new IllegalArgumentException(
"function vector_to_array requires a non-null input argument and input type must be " +
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
s"but got $ if (v == null) "null" else v.getClass.getName .")
.asNonNullable()
【讨论】:
【参考方案3】:这是一种从数据帧(字符串,向量)获取数据报(字符串,数组)的方法(没有 udf)。主要思想是使用中间RDD转换为Vector,并使用其toArray方法:
val arrayDF = vectorDF.rdd
.map(x => x.getAs[String](0) -> x.getAs[Vector](1).toArray)
.toDF("word","array")
【讨论】:
【参考方案4】:我的案例:word2vec之后的原始数据:
result.show(10,false)
+-------------+-----------------------------------------------------------------------------------------------------------+
|ip |features |
+-------------+-----------------------------------------------------------------------------------------------------------+
|1.1.125.120 |[0.0,0.0,0.0,0.0,0.0] |
|1.11.114.150 |[0.0,0.0,0.0,0.0,0.0] |
|1.116.114.36 |[0.022845590487122536,-0.012075710110366344,-0.034423209726810455,-0.04642726108431816,0.09164007753133774]|
|1.117.21.102 |[0.0,0.0,0.0,0.0,0.0] |
|1.119.13.5 |[0.0,0.0,0.0,0.0,0.0] |
|1.119.130.2 |[0.0,0.0,0.0,0.0,0.0] |
|1.119.132.162|[0.0,0.0,0.0,0.0,0.0] |
|1.119.133.166|[0.0,0.0,0.0,0.0,0.0] |
|1.119.136.170|[0.0,0.0,0.0,0.0,0.0] |
|1.119.137.154|[0.0,0.0,0.0,0.0,0.0] |
+-------------+-----------------------------------------------------------------------------------------------------------+
我想删除嵌入零的 ip:
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.linalg.Vector
val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic
val output = result.select($"ip",vecToSeq($"features").alias("features"))
val select_output = output.filter(output("features")!==Array(0,0,0,0,0))
select_output.show(5)
+-------------+--------------------+
| ip| features|
+-------------+--------------------+
| 1.116.114.36|[0.02284559048712...|
| 1.119.137.98|[-0.0244039318391...|
|1.119.177.102|[-0.0801128149032...|
|1.119.186.170|[0.01125990878790...|
|1.119.193.226|[0.04201301932334...|
+-------------+--------------------+
【讨论】:
以上是关于将特征的 Spark 向量转换为数组的主要内容,如果未能解决你的问题,请参考以下文章