使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....

Posted 暗时间&量变

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....相关的知识,希望对你有一定的参考价值。

package com.huawei.bigdata.spark.examples

import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by wulei on 2017/8/3.
  */
object PointCorrPredict {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("PointCorrPredict")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.sql("use vio_offsite")
    //360111010002,360102029001
    val dataFrame = sqlContext.sql("select kk_id,direct,day,hour,cnt,speed from kk_hour_scale").orderBy("day","hour")
    val newDataFrame = dataFrame.filter("kk_id = ‘3601110100‘and direct = ‘02‘")
                      .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100)
      .rdd.map(row=>row.getAs[Double]("cnt"))
    /*val dd =  newDataFrame.collect().take(3)
   dd.foreach(println)*/
    val destinationDataFrame = sqlContext.sql("select origin_kakou,destination_kakou from kk_relation ")
    val newDestinationDataFrame = destinationDataFrame.filter("origin_kakou = ‘360111010002‘").select("destination_kakou").collect()
    for (i <- 0 until newDestinationDataFrame.length){
      println(newDestinationDataFrame(i))
      println(newDestinationDataFrame(i).toString().substring(1,11))
      println(newDestinationDataFrame(i).toString().substring(11,13))
      val tmpDataFrame = dataFrame.filter("kk_id = ‘"+ newDestinationDataFrame(i).toString().substring(1,11)
                         +"‘ and direct = ‘"+newDestinationDataFrame(i).toString().substring(11,13)+"‘")
                        .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100)
        .rdd.map(row=>row.getAs[Double]("cnt"))
      //tmpDataFrame.foreach(row => println(row))
      var correlationPearson: Double = Statistics.corr(newDataFrame,tmpDataFrame)//计算不同数据之间的相关系数:皮尔逊
      println("\ncorrelationPearson:" + correlationPearson) //打印结果
    }
    println("11111")


  sc.stop()
  }
}

实现代码如上,因为Statistics.corr(RDD[Double],RDD[Double]),所以SparkSQL读取后的数据生成的dataFrame必须转换,第一步是转换成RDD[Row],Row就相当于sql查询出来的一条数据,这里也转换过多次才成功,最后百度得到可以先.cast(DoubleType)的形式。问题自己接触的少,要先看本质,然后看API,然后看案例就快了。

很明显可以从问题的描述上看是组之间的元素个数对应不上,但我已经被Row=>Double转晕了头,没有静心思考琢磨,没有专注仔细的自我对话,导致自己盲目的修改代码,还依然从转换问题上改变,后来转念一想才醒悟,以此警戒自己。limit

以上是关于使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....的主要内容,如果未能解决你的问题,请参考以下文章

皮尔森相关系数和余弦相似度

文本相似度计算(一):距离方法

1.python计算相似度

协同过滤推荐算法

皮尔逊相关系数和余弦相似性的关系

在推荐系统的皮尔逊相关用户-用户相似度矩阵中,NaN 是如何处理的?