通过 pyspark.ml CrossValidator 调整隐式 pyspark.ml ALS 矩阵分解模型的参数

Posted

技术标签:

【中文标题】通过 pyspark.ml CrossValidator 调整隐式 pyspark.ml ALS 矩阵分解模型的参数【英文标题】:Tuning parameters for implicit pyspark.ml ALS matrix factorization model through pyspark.ml CrossValidator 【发布时间】:2016-05-16 18:36:08 【问题描述】:

我正在尝试调整使用隐式数据的 ALS 矩阵分解模型的参数。为此,我尝试使用 pyspark.ml.tuning.CrossValidator 来运行参数网格并选择最佳模型。我相信我的问题出在评估者身上,但我无法弄清楚。

我可以让它适用于带有回归 RMSE 评估器的显式数据模型,如下所示:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import rand


conf = SparkConf() \
  .setAppName("MovieLensALS") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])
dfRatingsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsExplicit = ALS()
defaultModel = alsExplicit.fit(dfRatings)

paramMapExplicit = ParamGridBuilder() \
                    .addGrid(alsExplicit.rank, [8, 12]) \
                    .addGrid(alsExplicit.maxIter, [10, 15]) \
                    .addGrid(alsExplicit.regParam, [1.0, 10.0]) \
                    .build()

evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cvExplicit = CrossValidator(estimator=alsExplicit, estimatorParamMaps=paramMapExplicit, evaluator=evaluatorR)
cvModelExplicit = cvExplicit.fit(dfRatings)

predsExplicit = cvModelExplicit.bestModel.transform(dfRatingsTest)
predsExplicit.show()

当我尝试对隐式数据执行此操作时(假设是观看次数而不是评分),我收到一个我无法弄清楚的错误。这是代码(与上面非常相似):

dfCounts = sqlContext.createDataFrame([(0,0,0), (0,1,12), (0,2,3), (1,0,5), (1,1,9), (1,2,0), (2,0,0), (2,1,11), (2,2,25)],
                                 ["user", "item", "rating"])
dfCountsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCounts)

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

evaluatorB = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="rating")
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR)
cvModel = cv.fit(dfCounts)

predsImplicit = cvModel.bestModel.transform(dfCountsTest)
predsImplicit.show()

我尝试使用 RMSE 评估器执行此操作,但出现错误。据我了解,我还应该能够将 AUC 度量用于二​​元分类评估器,因为隐式矩阵分解的预测是用于预测二元矩阵 p_ui per this paper 的置信矩阵 c_ui,这是 pyspark ALS 的文档引用。

使用任一评估器都会给我一个错误,我找不到任何关于在线交叉验证隐式 ALS 模型的富有成果的讨论。我正在查看 CrossValidator 源代码,试图找出问题所在,但遇到了麻烦。我的一个想法是,在该过程将隐式数据矩阵 r_ui 转换为二进制矩阵 p_ui 和置信矩阵 c_ui 之后,我不确定它在评估阶段将预测的 c_ui 矩阵与什么进行比较。

这是错误:

Traceback (most recent call last):

  File "<ipython-input-16-6c43b997005e>", line 1, in <module>
    cvModel = cv.fit(dfCounts)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 69, in fit
    return self._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\tuning.py", line 239, in _fit
    model = est.fit(train, epm[j])

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 67, in fit
    return self.copy(params)._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 133, in _fit
    java_model = self._fit_java(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 130, in _fit_java
    return self._java_obj.fit(dataset._jdf)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)

etc.......

更新

我尝试缩放输入,使其在 0 到 1 的范围内,并使用 RMSE 评估器。在我尝试将其插入 CrossValidator 之前,它似乎运行良好。

以下代码有效。我得到预测,并从我的评估者那里得到一个 RMSE 值。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator


conf = SparkConf() \
  .setAppName("ALSPractice") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

# Users 0, 1, 2, 3 - Items 0, 1, 2, 3, 4, 5 - Ratings 0.0-5.0
dfCounts2 = sqlContext.createDataFrame([(0,0,5.0), (0,1,5.0),            (0,3,0.0), (0,4,0.0), 
                                        (1,0,5.0),            (1,2,4.0), (1,3,0.0), (1,4,0.0),
                                        (2,0,0.0),            (2,2,0.0), (2,3,5.0), (2,4,5.0),
                                        (3,0,0.0), (3,1,0.0),            (3,3,4.0)            ],
                                       ["user", "item", "rating"])

dfCountsTest2 = sqlContext.createDataFrame([(0,0), (0,1), (0,2), (0,3), (0,4),
                                            (1,0), (1,1), (1,2), (1,3), (1,4),
                                            (2,0), (2,1), (2,2), (2,3), (2,4),
                                            (3,0), (3,1), (3,2), (3,3), (3,4)], ["user", "item"])

# Normalize rating data to [0,1] range based on max rating
colmax = dfCounts2.select(F.max('rating')).collect()[0].asDict().values()[0]
normalize = udf(lambda x: x/colmax, FloatType())
dfCountsNorm = dfCounts2.withColumn('ratingNorm', normalize(col('rating')))

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCountsNorm)
preds = defaultModelImplicit.transform(dfCountsTest2)

evaluatorR2 = RegressionEvaluator(metricName="rmse", labelCol="ratingNorm")
evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm))

preds = defaultModelImplicit.transform(dfCountsTest2)

我不明白为什么以下不起作用。我使用相同的估计器,相同的评估器并拟合相同的数据。为什么这些可以在上面工作,但不能在 CrossValidator 内工作:

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR2)
cvModel = cv.fit(dfCountsNorm)

【问题讨论】:

感谢您发布这个问题。在您的编辑中,为什么要使用 evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm)) 而不是 evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsTest2)) 来计算 RMSE 是不是,在使用ALS进行隐式时,我们需要对0-1范围内的隐式数据进行归一化? 【参考方案1】:

忽略技术问题,严格来说,鉴于 ALS 生成的输入带有隐式反馈,这两种方法都不正确。

您不能使用RegressionEvaluator,因为您已经知道,预测可以解释为置信度值并表示为[0, 1] 范围内的浮点数,而标签列只是一个未绑定的整数。这些值显然无法比较。 您不能使用BinaryClassificationEvaluator,因为即使预测可以解释为概率标签也不代表二元决策。此外预测列的类型无效,不能直接与BinaryClassificationEvaluator一起使用

您可以尝试转换其中一列以使输入符合要求,但从理论角度来看,这并不是一个真正合理的方法,并且引入了难以调整的额外参数。

将标签列映射到 [0, 1] 范围并使用 RMSE。

将标签列转换为具有固定阈值的二进制指示符,并扩展 ALS / ALSModel 以返回预期的列类型。假设阈值为 1 它可能是这样的

from pyspark.ml.recommendation import *
from pyspark.sql.functions import udf, col
from pyspark.mllib.linalg import DenseVector, VectorUDT

class BinaryALS(ALS):
    def fit(self, df):
        assert self.getImplicitPrefs()
        model = super(BinaryALS, self).fit(df)
        return ALSBinaryModel(model._java_obj)

class ALSBinaryModel(ALSModel):
    def transform(self, df):
        transformed = super(ALSBinaryModel, self).transform(df)
        as_vector = udf(lambda x: DenseVector([1 - x, x]), VectorUDT())
        return transformed.withColumn(
            "rawPrediction", as_vector(col("prediction")))

# Add binary label column
with_binary = dfCounts.withColumn(
    "label_binary", (col("rating") > 0).cast("double"))

als_binary_model = BinaryALS(implicitPrefs=True).fit(with_binary)

evaluatorB = BinaryClassificationEvaluator(
    metricName="areaUnderROC", labelCol="label_binary")

evaluatorB.evaluate(als_binary_model.transform(with_binary))
## 1.0

一般来说,教科书中缺少有关使用隐式反馈评估推荐系统的材料,我建议您阅读eliasah 的answer 以了解如何评估此类推荐系统。

【讨论】:

非常感谢您的回答。我尝试了您的两个建议,并决定采用缩放和 RMSE 评估方法。它似乎运行良好,除了当我插入 CrossValidator 函数时。我认为 CrossValidator 以相同的方式使用估计器和评估器,同时自动化 k 折交叉验证并迭代网格中的参数组合。我用我的新代码(有效的东西和无效的东西)更新了我上面的帖子。您对 CrossValidator 正在做什么导致它在 fit 语句中出错有任何见解吗? 与评估者策略无关。它简单(或不那么简单)意味着不可能针对给定的数据和一组参数求解系统。 我尝试将参数网格设置为仅包含默认的估计器参数值。我仍然收到错误,所以我认为这不是参数组合。你知道哪里可能有问题吗?我想我可以设置自己的嵌套循环来通过参数组合和交叉验证来执行迭代,但我更愿意使用 Spark 的内置函数来实现。 @xenocyon 输入可以解释为项目相关的概率。所以它只是创建了一对 (P(item-irrelevant), P(item-relevant)) 这几乎是 BinaryClassifcationEvaluator 所期望的 (P(class(x) =0), P(class(x) = 1)) @zero323 我看到您的 AUC 为 1.0。当我转换它然后运行评估器时,即使我在验证数据集上获得 1.0 的 AUC。从技术上讲,人们并不期望 AUC 为 1.0 并且略低于好的模型。得到 1.0 有什么问题吗?【参考方案2】:

通过隐式反馈,我们不会让用户对我们的建议做出反应。因此,我们不能使用基于精度的指标。

在已经cited paper 中,使用了预期的百分位排名指标。

您可以尝试基于 Spark ML 库中的类似指标实现 Evaluator,并在您的交叉验证管道中使用它。

【讨论】:

【参考方案3】:

在这里聚会很晚,但我会发帖以防有人像我一样偶然发现这个问题。

在尝试将CrossValidator 与 ALS 模型一起使用时,我遇到了类似的错误。我通过将ALS 中的coldStartStrategy 参数设置为“drop”来解决它。那就是:

alsImplicit = ALS(implicitPrefs=True, coldStartStrategy="drop")

并保持其余代码相同。

我预计我的示例中发生的情况是,交叉验证拆分创建的场景是验证集中的项目未出现在训练集中,这会导致 NaN 预测值。最佳解决方案是在评估时删除 NaN 值,如 documentation 中所述。

我不知道我们是否遇到了同样的错误,所以不能保证这会解决 OP 的问题,但无论如何最好设置 coldStartStrategy="drop" 进行交叉验证。

注意:我的错误消息是“参数必须是参数映射或参数映射的列表/元组”,这似乎并不意味着存在问题使用 coldStartStrategy 参数或 NaN 值,但此解决方案解决了错误。

【讨论】:

【参考方案4】:

为了用implicitPrefs=True 交叉验证我的ALS 模型,我需要为pyspark==2.3.0 稍微调整@zero323 的答案,我遇到了以下异常:

xspy4j.Py4JException: Target Object ID does not exist for this gateway :o2733\\n\tat py4j.Gateway.invoke(Gateway.java...java:79)\\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\\n\tat java.lang.Thread.run(Thread.java:748)\\n

ALS 扩展了JavaEstimator,它提供了安装包装Java/Scala 实现的Estimators 所需的钩子。我们需要在BinaryALS 中覆盖_create_model,以便PySpark 可以保持所有Java 对象引用的直接:

import pyspark.sql.functions as F
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql.dataframe import DataFrame


class ALSBinaryModel(ALSModel):
    def transform(self, df: DataFrame) -> DataFrame:
        transformed = super().transform(df)
        as_vector = F.udf(lambda x: DenseVector([1 - x, x]), VectorUDT())
        return transformed.withColumn("rawPrediction", as_vector(F.col("prediction")))


class BinaryALS(ALS):
    def fit(self, df: DataFrame) -> ALSBinaryModel:
        assert self.getImplicitPrefs()
        return super().fit(df)

    def _create_model(self, java_model) -> ALSBinaryModel:
        return ALSBinaryModel(java_model=java_model)

【讨论】:

以上是关于通过 pyspark.ml CrossValidator 调整隐式 pyspark.ml ALS 矩阵分解模型的参数的主要内容,如果未能解决你的问题,请参考以下文章

我通过使用它的 pyspark.ml.regression.LinearRegression 在 spark 中创建一个模型

在非 Spark 环境中加载 pyspark ML 模型

pyspark.ml:计算精度和召回时的类型错误

PySpark ML——分布式机器学习库

在 pyspark.ml 中运行多个功能的变压器

如何使 pyspark 和 ML(无 RDD)与大型 csv 一起工作?