应用 pyspark ALS 的“recommendProductsForUsers”时出现 *** 错误(尽管集群可用 >300GB Ram)
Posted
技术标签:
【中文标题】应用 pyspark ALS 的“recommendProductsForUsers”时出现 *** 错误(尽管集群可用 >300GB Ram)【英文标题】:***-error when applying pyspark ALS's "recommendProductsForUsers" (although cluster of >300GB Ram available) 【发布时间】:2016-08-16 16:07:35 【问题描述】:寻找专业知识来指导我解决以下问题。
背景:
我正在尝试使用受this example 启发的基本 PySpark 脚本 作为部署基础架构,我使用 Google Cloud Dataproc 集群。 我的代码中的基石是函数“recommendProductsForUsers”,记录在 here 中,它为模型中的所有用户提供了*** X 产品我遇到的问题
ALS.Train 脚本运行平稳,并且在 GCP 上可以很好地扩展(轻松超过 100 万客户)。
但是,应用预测:即使用函数“PredictAll”或“recommendProductsForUsers”,根本无法扩展。我的脚本对于一个小数据集(50k 客户和 >10k 产品)
然后我得到的错误如下:
16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager:
Lost task 22.0 in stage 411.0 (TID 15139,
productrecommendation-high-w-2.c.main-nova-558.internal):
java.lang.***Error
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
我什至得到了一个 300 GB 的集群(1 个 108 GB 的主节点 + 2 个 108 GB RAM 的节点)来尝试运行它;它适用于 50k 客户,但不适用于更多客户
我的目标是建立一个可以为超过 80 万客户运行的设置
详情
失败的代码行
predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))
pprint.pprint(predictions.take(10))
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)])
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates()
您建议如何进行?我觉得脚本末尾的“合并”部分(即当我将其写入 dfToSave 时)会导致错误;有没有办法绕过这个并部分保存?
【问题讨论】:
【参考方案1】:从堆栈跟踪来看,这似乎与Spark gives a ***Error when training using ALS 存在相同的问题
基本上,Spark 递归地表达 RDD 沿袭,因此当在迭代工作负载过程中没有对事物进行惰性评估时,您最终会得到深度嵌套的对象。调用 sc.setCheckpointDir 并调整检查点间隔将减少此 RDD 沿袭的长度。
【讨论】:
嗨丹尼斯,感谢您的想法。我确实看到了其他线程并且我同意 ALS.train 确实有一个您可以自定义的检查点间隔参数。但是,predictAll 和 RecommendationProductsForUsers 函数都有这个参数;检查点将如何工作? 更新:实施检查点(感谢 dennis 的提示)。尽管它可以很好地扩展 ALS.train 函数(很容易超过 100 万客户),但它不适用于应用预测:即使用函数 PredictAll 或 RecommendationProductsForUsers。对此有何建议? 应用检查点后,当异常被抛出时,您是否仍然看到涉及ObjectInputStream
的相同堆栈跟踪,或者它是否已更改?
嗨丹尼斯;本质上:不再有错误,但我们应用预测的部分(即使用函数 PredictAll 或 RecommendationProductsForUsers)与其余代码相比非常慢(需要 2 小时,而 ALS.train 只需要 2 分钟)。有什么加快速度的建议吗?我现在转移到 1master+30 小作品,在这一步中似乎没有受到 CPU 和 ram 的挑战。 wdyt?
@BartV,发生这种情况的原因是,当您进行“预测”或“推荐”时,您需要在用户和产品之间创建一个交叉点。基本上,对于您的每个用户,您需要预测每个产品,并找到得分最高的产品。您正在运行多少个执行程序?在我的第一次尝试中,我发现我使用的是默认值 (3...),当我增加到 50 时(这意味着预测必须处理 50^2=2500 个任务!)我看到了性能的显着提高。
以上是关于应用 pyspark ALS 的“recommendProductsForUsers”时出现 *** 错误(尽管集群可用 >300GB Ram)的主要内容,如果未能解决你的问题,请参考以下文章
通过 pyspark.ml CrossValidator 调整隐式 pyspark.ml ALS 矩阵分解模型的参数