如何将 RDD[Row] 转换为 RDD[Vector]
Posted
技术标签:
【中文标题】如何将 RDD[Row] 转换为 RDD[Vector]【英文标题】:How to convert RDD[Row] to RDD[Vector] 【发布时间】:2016-05-27 16:52:10 【问题描述】:我正在尝试使用 scala 实现 k-means 方法。 我创建了一个类似的 RDD
val df = sc.parallelize(data).groupByKey().collect().map((chunk)=>
sc.parallelize(chunk._2.toSeq).toDF()
)
val examples = df.map(dataframe =>
dataframe.selectExpr(
"avg(time) as avg_time",
"variance(size) as var_size",
"variance(time) as var_time",
"count(size) as examples"
).rdd
)
val rdd_final=examples.reduce(_ union _)
val kmeans= new KMeans()
val model = kmeans.run(rdd_final)
使用此代码我得到一个错误
type mismatch;
[error] found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error] required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
所以我尝试着做:
val rdd_final_Vector = rdd_final.mapx:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)
val model = kmeans.run(rdd_final_Vector)
然后我得到一个错误:
java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector
所以我正在寻找一种方法来进行该演员,但我找不到任何方法。
有什么想法吗?
最好的问候
【问题讨论】:
你看过这个吗? ***.com/a/35915327/2661491 是的,我读过但不明白。我看到“密集”和“getAs”的错误。我将 val rdd_final_Vector 更改为: val rdd_final_Vector= rdd_final.map row => Vectors.dense(row.getAs[Seq[Double]].toArray) 但我收到错误:( 【参考方案1】:这里至少有几个问题:
-
不你真的不能将 Row 转换为 Vector:Row 是
Spark SQL
理解的潜在不同类型的集合。 Vector
不是原生 spark sql 类型
SQL 语句的内容与您尝试使用KMeans
实现的内容之间似乎不匹配:SQL 正在执行聚合。但是KMeans
需要一系列单独的数据点,格式为 Vector(封装了 Array[Double]
)。那么 - 为什么要为 KMeans
操作提供 sum
和 average
?
在这里仅解决#1:您需要按照以下方式做一些事情:
val doubVals = <rows rdd>.map row => row.getDouble("colname")
val vector = Vectors.toDense doubVals.collect
然后你有一个正确封装的Array[Double]
(在一个向量内),可以提供给Kmeans
。
【讨论】:
感谢 javadba。我会检查我的代码。我正在提供总和和平均值,因为我得到流数据,并且我正在尝试使用数据的平均值和数据的总和每 n 秒创建一个对象:该对象我将为您提供 KMeans。 给我 classnotfound 异常 @Amalo 。你有一个完全不同的问题:Vectors
类是核心 Spark
mllib 组件。以上是关于如何将 RDD[Row] 转换为 RDD[Vector]的主要内容,如果未能解决你的问题,请参考以下文章
如何将 cassandraRow 转换为 Row(apache spark)?
将嵌入在 Dataframe 中的 Row RDD 转换为 List