如何将 RDD[Row] 转换为 RDD[Vector]

Posted

技术标签:

【中文标题】如何将 RDD[Row] 转换为 RDD[Vector]【英文标题】:How to convert RDD[Row] to RDD[Vector] 【发布时间】:2016-05-27 16:52:10 【问题描述】:

我正在尝试使用 scala 实现 k-means 方法。 我创建了一个类似的 RDD

val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> 
  sc.parallelize(chunk._2.toSeq).toDF()
)

val examples = df.map(dataframe =>
  dataframe.selectExpr(
    "avg(time) as avg_time",
    "variance(size) as var_size",
    "variance(time) as var_time",
    "count(size) as examples"
  ).rdd
)

val rdd_final=examples.reduce(_ union _)

val kmeans= new KMeans()
val model = kmeans.run(rdd_final)

使用此代码我得到一个错误

type mismatch;
[error]  found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error]  required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]

所以我尝试着做:

val rdd_final_Vector = rdd_final.mapx:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)

val model = kmeans.run(rdd_final_Vector)

然后我得到一个错误:

java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector

所以我正在寻找一种方法来进行该演员,但我找不到任何方法。

有什么想法吗?

最好的问候

【问题讨论】:

你看过这个吗? ***.com/a/35915327/2661491 是的,我读过但不明白。我看到“密集”和“getAs”的错误。我将 val rdd_final_Vector 更改为: val rdd_final_Vector= rdd_final.map row => Vectors.dense(row.getAs[Seq[Double]].toArray) 但我收到错误:( 【参考方案1】:

这里至少有几个问题:

    你真的不能将 Row 转换为 Vector:Row 是 Spark SQL 理解的潜在不同类型的集合。 Vector 不是原生 spark sql 类型 SQL 语句的内容与您尝试使用KMeans 实现的内容之间似乎不匹配:SQL 正在执行聚合。但是KMeans 需要一系列单独的数据点,格式为 Vector(封装了 Array[Double])。那么 - 为什么要为 KMeans 操作提供 sumaverage

在这里仅解决#1:您需要按照以下方式做一些事情:

val doubVals = <rows rdd>.map row =>   row.getDouble("colname") 
val vector = Vectors.toDense doubVals.collect

然后你有一个正确封装的Array[Double](在一个向量内),可以提供给Kmeans

【讨论】:

感谢 javadba。我会检查我的代码。我正在提供总和和平均值,因为我得到流数据,并且我正在尝试使用数据的平均值和数据的总和每 n 秒创建一个对象:该对象我将为您提供 KMeans。 给我 classnotfound 异常 @Amalo 。你有一个完全不同的问题:Vectors 类是核心 Spark mllib 组件。

以上是关于如何将 RDD[Row] 转换为 RDD[Vector]的主要内容,如果未能解决你的问题,请参考以下文章

如何将 cassandraRow 转换为 Row(apache spark)?

如何在火花中将rdd对象转换为数据框

如何将三个 RDD 加入一个元组?

将嵌入在 Dataframe 中的 Row RDD 转换为 List

Pyspark 将 RowMatrix 转换为 DataFrame 或 RDD

数据帧到 RDD[Row] 用空值替换空间