使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....
Posted 暗时间&量变
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....相关的知识,希望对你有一定的参考价值。
package com.huawei.bigdata.spark.examples import org.apache.spark.mllib.stat.Statistics import org.apache.spark.sql.types.DoubleType import org.apache.spark.{SparkConf, SparkContext} /** * Created by wulei on 2017/8/3. */ object PointCorrPredict { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("PointCorrPredict") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("use vio_offsite") //360111010002,360102029001 val dataFrame = sqlContext.sql("select kk_id,direct,day,hour,cnt,speed from kk_hour_scale").orderBy("day","hour") val newDataFrame = dataFrame.filter("kk_id = ‘3601110100‘and direct = ‘02‘") .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100) .rdd.map(row=>row.getAs[Double]("cnt")) /*val dd = newDataFrame.collect().take(3) dd.foreach(println)*/ val destinationDataFrame = sqlContext.sql("select origin_kakou,destination_kakou from kk_relation ") val newDestinationDataFrame = destinationDataFrame.filter("origin_kakou = ‘360111010002‘").select("destination_kakou").collect() for (i <- 0 until newDestinationDataFrame.length){ println(newDestinationDataFrame(i)) println(newDestinationDataFrame(i).toString().substring(1,11)) println(newDestinationDataFrame(i).toString().substring(11,13)) val tmpDataFrame = dataFrame.filter("kk_id = ‘"+ newDestinationDataFrame(i).toString().substring(1,11) +"‘ and direct = ‘"+newDestinationDataFrame(i).toString().substring(11,13)+"‘") .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100) .rdd.map(row=>row.getAs[Double]("cnt")) //tmpDataFrame.foreach(row => println(row)) var correlationPearson: Double = Statistics.corr(newDataFrame,tmpDataFrame)//计算不同数据之间的相关系数:皮尔逊 println("\ncorrelationPearson:" + correlationPearson) //打印结果 } println("11111") sc.stop() } }
实现代码如上,因为Statistics.corr(RDD[Double],RDD[Double]),所以SparkSQL读取后的数据生成的dataFrame必须转换,第一步是转换成RDD[Row],Row就相当于sql查询出来的一条数据,这里也转换过多次才成功,最后百度得到可以先.cast(DoubleType)的形式。问题自己接触的少,要先看本质,然后看API,然后看案例就快了。
很明显可以从问题的描述上看是组之间的元素个数对应不上,但我已经被Row=>Double转晕了头,没有静心思考琢磨,没有专注仔细的自我对话,导致自己盲目的修改代码,还依然从转换问题上改变,后来转念一想才醒悟,以此警戒自己。limit
以上是关于使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....的主要内容,如果未能解决你的问题,请参考以下文章