大数据Spark MLlib推荐算法

Posted 赵广陆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据Spark MLlib推荐算法相关的知识,希望对你有一定的参考价值。


1 相似度算法

无论是基于用户还是基于商品的推荐,都是需要找到相似的用户或者商品,才能做推荐,所以,相似度算法就变得非常重要了。

常见的相似度算法有:

  • 欧几里德距离算法(Euclidean Distance)
  • 皮尔逊相似度算法(Pearson Correlation Coefficient)
  • 基于夹角余弦相似度算法(Consine Similarity)
  • 基于Tanimoto系数相似度(Tanimoto Coefficient)

1.1 欧几里德距离算法

上图即二维空间中6位用户对Snakes 和 Dupree 这两Item评价的直观体现。

根据两用户之间共同评价的Item为维度,建立一个多维的空间,那么通过用户对单一维度上的评价Score组成的坐标系X(s1,s2,s3……,si)即可定位该用户在这个多维度空间中的位置,那么任意两个位置之间的距离Distance(X,Y)(即:欧式距离)就能在一定程度上反应了两用户兴趣的相似程度。

就其意义而言,欧氏距离越小,两个用户相似度就越大,欧氏距离越大,两个用户相似度就越小。

1.2 基于夹角余弦相似度算法

计算夹角,并得出夹角对应的余弦值,此余弦值就可以用来表征,这两个向量的相似性。夹角越小,余弦值越接近于1,它们的方向更加吻合,则越相似。

计算公式:

2 最近邻域

通过相似度计算,可以计算出邻居,问题来了,我们如果选取出几个邻居作为参考,进行推荐呢?

通常有2种方式:

  • 固定数量的邻居:K-neighborhoods
  • 基于相似度门槛的邻居:Threshold-based neighborhoods

3 交替最小二乘法

交替最小二乘法(ALS)是统计分析中最常用的逼近计算的一种算法,其交替计算结果使得最终结果尽可能地逼近真实结果。而ALS的基础是最小二乘法(LS算法),LS算法是一种常用的机器学习算法,它通过最小化误差的平方和寻找数据的最佳函数匹配。利用最小二乘法可以简便的求得未知的数据,并使得这些求得的数据与实际数据之间误差的平法和为最小。也就是评分未知,稀疏矩阵预测评分.由于ALS算法的目标函数不是凸的,而且变量互相耦合在一起,所以它并不容易求解。但如果把用户特征矩阵U和物品特征矩阵V固定其一,其目标函数就立刻变成了一个凸的而且是可拆分的。

3.1 最小二乘法

以一个变量为例,在二维空间中最小二乘法的原理图如下:

若干个点依次分布在向量空间中,如果希望找出一条直线和这些点达到最佳匹配,那么最简单的一个方法就是希望这些点到直线的距离最小,则可得出最小二乘法的公式如下:

2是开方

这里的f(x)是直接的拟合公式,也是所求的目标函数,在这里希望各个点到直接的值最小,因此第二个公式就是求所有点到该直接的距离,我们可以微分求得其最小值。

3.2 交替最小二乘法

以用户,商品为例说明,一个基于用户名,物品表的用户评分矩阵可以被分解成2个较为小型化的矩阵,即M=U的转置矩阵*V。

在这里U和V分别表示 用户和物品的矩阵,在MLlib的ALS算法中,首先会对U或者V矩阵随机生成,之后固定某一个特定的对象,去求取另一个未随机化的矩阵对象。

之后利用求取的矩阵对象去求随机化矩阵对象。最后两个对象相互迭代计算,求取与实际矩阵差异达到程序设定的最小阀值位置。

通俗的说就是先固定U矩阵,求取V,然后再固定V矩阵再求取U矩阵,一直这样交替迭代计算直到误差达到一定的阀值条件或者达到迭代次数的上限。例如,固定U求V,这个问题就是经典的最小二乘问题。所谓交替,就是指先随机生成U(0),然后固定它,去 求 解V ( 0 ) ;再 固 定V ( 0 ) ,然 后 求 解U ( 1 ) ,这 样 交 替 进 行 下 去 。 因 为 每一次迭代都会降低重构误差,并且误差是有下界的,所以ALS算法一定会收敛。但由于目标函数是非凸的,所以ALS算法并不保证会收敛到全局最优解。然而在实际应用中,ALS算法对初始
点不是很敏感,且是不是全局最优解也不会有大的影响。

3.3 ALS算法流程

通常,产品的用户评分矩阵是庞大且稀疏的,因此在非常稀疏的数据集上采用简单的用户(或物品)相似度比较进行推荐,直观上给人的感觉是这样做缺少依据。理论上分析一下我们也能理解,基于记忆的协同过滤推荐实际上并没有充分挖掘数据集中的潜在因素。本节介绍的交替最小二乘法(AlternatingLeastSquares,ALS)算法,其核心思想就是要进一步挖掘通过观察得到的所有用户给产品的评分,并通过引入用户特征矩阵(UserFeaturesMatrix)和物品特征矩阵(ItemFeaturesMatrix)来建立一个机器学习模型,然后利用采集的数据对这个模型进行训练(反复迭代),最后得到用于推荐计算的用户特征矩阵和物品特征矩阵,从而来推断(也就是预测)每个用户的喜好并向用户推荐适合的物品。

ALS算法解决了用户评分矩阵中的缺失因子问题,实现了用预测得到的缺失因子进行推荐。

ALS 算法的思路就不同了。 ALS 算法基于下面这个假设:评分矩阵是近似低秩 (Low・Rank) 矩阵;换句话说,评分矩阵 A(mxn) 可以用两个小矩阵 U(mxk) 和 V(nxk) 来近似表示,即:

式中, k 远小于 m 和 n, 这样就把整个系统的自由度从 O(mn) 降到了 O((m+n)k) 。其实,
ALS 算法的低秩假设是建立在客观存在的合理性基础上的。例如,用户特征有很多,如年龄、性别、职业、身高、学历、婚姻、地区、存款等,可以说不胜枚举,但我们没有必要把用户的所有特征都用起来,因为并不是所有特征都起同样的作用。例如,在后面展示的电影推荐示例中,用户特征矩阵仅仅包含了用户编号、性别、年龄、职业、邮编5 个字段。同样,物品的属性也有很多,以电影为例,可以有主演、导演、特效、剧情、类型等,但实际应用中我们只需要描述少数关键属性即可,因此我们仅仅考虑了三个属性,即电影编号、电影名和电影类别(当然这只是示例,到底 k 取什么值,可以采用系统自适应调节方法,通过应用逐步找到最佳的 k 值)。

总之, ALS 算法的巧妙之处就在于,引入了两个特征矩阵,一个是用户特征矩阵,用 U表示,另一个是物品特征矩阵,用 V 表示,这两个矩阵的秩都比较低。接下来的问题是怎样得到这两个抽象的低秩序阵。既然已经假设评分矩阵 A 可以通过UVT 来近似,那么一个最直接的可以量化的东西就是通过 U 和 V 重构 A 时产生的误差。在ALS 算法中,使用式给出的 Frobenius 范数(又称为 Euclid 范数)

来表示重构误差,也就是每个元素的重构误差的平方和,如式所示。

这里存在一个问题,由于只观察到部分评分,A中有大量的未知元素是需要推断的,所以这个重构误差包含了未知数。解决方案很简单,就是只计算对已知评分的重构误差。

当然,也可以先用一个简单的方法把评分矩阵填满,再进行重构误差计算,但是这样做似乎也没有太多道理。总之,ALS算法就是求解下面的优化问题:

经过上面的处理,一个协同推荐问题就通过低秩假设被成功地转换成了一个优化问题。
但是,这个优化问题怎么解呢?不要忘记,我们的目标是求出U和V这两个矩阵。

ALS 算法可以大体描述如下。

第一步,用小于 1 的数随机初始化 V 。
第二步,在训练数据集上反复迭代、交替计算 U 和 V, 直到 RMSE (均方根误差,一种常用的离散性度量方法)值收敛或迭代次数足够多。
第三步,返回 UVT, 进行预测推荐。之所以说上述算法是一个大体描述,是因为第二步中还包含了如何计算 U 和 V 的表达式,它们是通过求偏导推出的。

3.4 ALS算法实战

3.4.1 数据说明

ALS 算法对 GroupLens Research ( http://grouplens.org/datasets/movielens/) 提供的数据进行学习并推荐。该数据为一组从 20 世纪 90 年代末到 21 世纪初由 MovieLens 用户提供的电影评价数据,包括评分、电影元数据(如风格类型和年代),以及关于用户的人口统计学数据(如年龄、邮编、性别和职业等)。根据不同需求, GroupLens Research 提供了不同大小的样本数据,包含了评分、用户信息和电影信息三种数据。下面先来看看待处理的电影评价数据,再给出应用程序并进行分析。

示例数据是用户、电影、评分的数据,其中用户有943人,电影有1682部,评价有100000条。文件在资料中。

196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596
298	474	4	884182806
115	265	2	881171488
253	465	5	891628467
305	451	3	886324817
6	86	3	883603013
62	257	2	879372434
286	1014	5	879781125
200	222	5	876042340
210	40	3	891035994
................

3.4.2 数据建模

ALS算法的第二步就是数据建模,其实在MLlib算法库中有可以直接使用的训练算法,ALS.tran方法源码如下:

 def train(
      ratings: RDD[Rating],       //需要训练的数据集
      rank: Int,                  //模型中隐藏因子数,rank一般选在8到20之间
      iterations: Int,            //算法中迭代次数,一般10次即可
      lambda: Double,             //ALS中的正则化参数,一般设置0.01
      blocks: Int,                //并行计算的block数(-1为自动配置)
      alpha: Double,              //ALS隐式反馈变化率用于控制每次拟合修正的幅度
      seed: Long                  //加载矩阵的随机数
    ): MatrixFactorizationModel = {
    new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
  }

3.4.3 实战

package cn.oldlu.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;

public class MyRecommend {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("MyRecommend")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.setLogLevel("WARN"); //设置日志级别

        JavaRDD<String> rawData = jsc.textFile("F://ml-100k//u.data");//设置数据集文件
        JavaRDD<String[]> rawRatings = rawData.map(v1 -> v1.split("\\t"));//将数据按照\\t分割
        //转化为Rating结构,参数分别为:用户id,商品id,评分
        JavaRDD<Rating> ratings = rawRatings.map(v1 -> new Rating(Integer.valueOf(v1[0]), Integer.valueOf(v1[1]), Double.valueOf(v1[2])));

        //设置训练模型
        MatrixFactorizationModel model = ALS.train(ratings.rdd(), 8, 10, 0.01);

        //为789用户推荐10个商品
        Rating[] recommendProducts = model.recommendProducts(789, 10);

        //打印推荐结果
        for (Rating rating : recommendProducts) {
            System.out.println(rating.user() + "->" + rating.product()+": " + rating.rating());
        }
    }
}

3.4.4 优化改进

在上面的实战中,rank、iterations、lambda参数都是写死的,根据不同环境和数据集需要作出调整,所以我们需要计算中最佳的参数,才能得到最佳的训练集。

package cn.oldlu.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import scala.Tuple2;

import java.util.List;

public class MyRecommend2 {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("MyRecommend")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.setLogLevel("WARN"); //设置日志级别

        JavaRDD<String> rawData = jsc.textFile("F://ml-100k//u.data");//设置数据集文件
        JavaRDD<String[]> rawRatings = rawData.map(v1 -> v1.split("\\t"));//将数据按照\\t分割
        //装载样本评分数据,其中最后一列Timestamp取除10的余数作为key,Rating为值,即(Int,Rating)
        JavaPairRDD<Long, Rating> ratings = rawRatings.mapToPair(v1 -> {
            Rating rating = new Rating(Integer.valueOf(v1[0]), Integer.valueOf(v1[1]), Double.valueOf(v1[2]));
            return new Tuple2<>(Long.valueOf(v1[3]) % 10, rating);
        });

        //装载电影目录对照表(电影ID->电影标题)
        List<Tuple2> movies = jsc.textFile("F://ml-100k//u.item").map(v1 -> {
            String[] ss = v1.split("\\\\|");
            return new Tuple2(ss[0], ss[1]);
        }).collect();

        //统计有用户数量和电影数量以及用户对电影的评分数目
        Long numRatings = ratings.count();
        Long numUsers = ratings.map(v1 -> ((Rating) v1._2()).user()).distinct().count();
        Long numMovies = ratings.map(v1 -> ((Rating) v1._2()).product()).distinct().count();
        System.out.println("用户:" + numUsers + "电影:" + numMovies + "评论:" + numRatings);

        //将样本评分表以key值切分成3个部分,分别用于训练 (60%,并加入用户评分), 校验 (20%), and 测试 (20%)
        //该数据在计算过程中要多次应用到,所以cache到内存

        Integer numPartitions = 4; // 分区数
        // 训练集
        JavaRDD<Rating> training = ratings
                .filter(v -> v._1() < 6)
                .values()
                .repartition(numPartitions)
                .cache();

        // 校验集
        JavaRDD<Rating> validation = ratings
                .filter(v -> v._1() >= 6 && v._1() < 8)
                .values()
                .repartition(numPartitions).cache();

        // 测试集
        JavaRDD<Rating> test = ratings
                .filter(v -> v._1() >= 8)
                .values()
                .cache();

        Long numTraining = training.count();
        Long numValidation = validation.count();
        Long numTest = test.count();
        System.out.println("训练集:" + numTraining + " 校验集:" + numValidation + " 测试集:" + numTest);

        //训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模
        int[] ranks = new int[]{10, 11, 12};
//        double[] lambdas = new double[]{0.01, 0.03, 0.1, 0.3, 1, 3};
        double[] lambdas = new double[]{0.01};
//        int[] numIters = new int[]{8, 9, 10, 11, 12, 13, 14, 15};
        int[] numIters = new int[]{8, 9, 10};

        MatrixFactorizationModel bestModel = null;
        double bestValidationRmse = Double.MAX_VALUE;
        int bestRank = 0;
        double bestLambda = -0.01;
        int bestNumIter = 0;

        for (int rank : ranks) {
            for (int numIter : numIters) {
                for (double lambda : lambdas) {
                    MatrixFactorizationModel model = ALS.train(training.rdd(), rank, numIter, lambda);
                    Double validationRmse = computeRmse(model, validation, numValidation);
                    System.out.println("RMSE(校验集) = " + validationRmse + ", rank = " + rank + ", lambda = " + lambda + ", numIter = " + numIter);

                    if (validationRmse < bestValidationRmse) {
                        bestModel = model;
                        bestValidationRmse = validationRmse;
                        bestRank = rank;
                        bestLambda = lambda;
                        bestNumIter = numIter;
                    }
                }
            }
        }



        double testRmse = computeRmse(bestModel, test, numTest);
        System.out.println("测试数据集在 最佳训练模型 rank = " + bestRank + ", lambda = " + bestLambda + ", numIter = " + bestNumIter + ", RMSE = " + testRmse);

        // 计算均值
        Double meanRating = training.union(validation).mapToDouble(v -> v.rating()).mean();

        // 计算标准误差值
        Double baselineRmse = Math.sqrt(test.map(v -> (meanRating - v.rating()) * (meanRating - v.rating())).reduce((v1, v2) -> (v1 + v2) / numTest));

        // 计算准确率提升了多少
        double improvement = (baselineRmse - testRmse) / baselineRmse * 100;

        System.out.println("最佳训练模型的准确率提升了:" + String.format("%.2f", improvement) + "%.");


        // 构建最佳训练模型
        bestModel = ALS.

以上是关于大数据Spark MLlib推荐算法的主要内容,如果未能解决你的问题,请参考以下文章

大数据Spark MLlib基于模型的协同过滤

基于Spark MLlib平台的协同过滤算法---电影推荐系统

和美大家说 | 基于Spark MLlib的文本大数据处理

如何利用Spark MLlib进行个性推荐?

机器学习讲座,如何利用Spark MLlib进行个性推荐?

资料推荐:Spark-mllib 源码分析之逻辑回归