IndexedRowMatrix().columnSimilarities() 检索到的 PySpark 相似性无法访问:INFO ExternalSorter: Thread *spilling i
Posted
技术标签:
【中文标题】IndexedRowMatrix().columnSimilarities() 检索到的 PySpark 相似性无法访问:INFO ExternalSorter: Thread *spilling in-memory map【英文标题】:PySpark similarities retrieved by IndexedRowMatrix().columnSimilarities() are not acessible: INFO ExternalSorter: Thread * spilling in-memory map 【发布时间】:2016-04-01 17:16:21 【问题描述】:当我运行代码时:
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from random import random
import os
from scipy.sparse import csc_matrix
import pandas as pd
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.sql import SQLContext
sc =SparkContext()
sqlContext = SQLContext(sc)
df = pd.read_csv("/Users/Andre/Code/blitsy-analytics/R_D/Data/cust_item_counts.csv", header=None)
customer_map = x[1]:x[0] for x in enumerate(df[0].unique())
item_map = x[1]:x[0] for x in enumerate(df[1].unique())
df[0] = df[0].map(lambda x: customer_map[x])
df[1] = df[1].map(lambda x: item_map[x])
#matrix = csc_matrix((df[2], (df[0], df[1])),shape=(max(df[0])+1, max(df[1])+1))
entries = sc.parallelize(df.apply(lambda x: tuple(x), axis=1).values)
mat = CoordinateMatrix(entries).toIndexedRowMatrix()
sim = mat.columnSimilarities()
sim.entries.map(lambda x: x).first()
我陷入了溢出到磁盘的线程循环中:
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 294
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 293
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 292
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 291
> 16/04/01 12:09:42 INFO ExternalSorter: Thread 108 spilling in-memory
> map of 137.6 MB to disk (1 time so far) 16/04/01 12:09:42 INFO
> ExternalSorter: Thread 112 spilling in-memory map of 158.1 MB to disk
> (1 time so far) 16/04/01 12:09:42 INFO ExternalSorter: Thread 114
> spilling in-memory map of 154.2 MB to disk (1 time so far) 16/04/01
> 12:09:42 INFO ExternalSorter: Thread 113 spilling in-memory map of
> 143.4 MB to disk (1 time so far)
从返回它的第一行条目的矩阵“mat”来看,这不是真的。
这与内存管理或函数 columnSimilarity() 本身有关吗?
我在 sim 变量中有 ~86000 行和列。
我的数据集是一个元组列表(user_id、item_id、value)。我将 user_id 和 item_id 范围转换为 0 和 len(user_id| tem_id) 之间的值。这是一个 800000 的 id 并不会强制矩阵那么大。
有 800,000 个此类条目。变量 'mat' 中的矩阵保存来自 (user_id, item_id) 坐标的元组的值。经我核实,确实如此。
“mat”的矩阵有约 41,000 个用户和约 86,000 个项目。列相似性创建每个项目之间的比较,这就是它的尺寸为 86k x 86k
这一切都是在 pyspark 终端 ./bin/pyspark 中完成的。
【问题讨论】:
使用的是哪个版本的 spark ?您的集群配置是什么? 你可以试试 1.6 分支吗?确保它不只是在 2.0 版本中。该版本尚未正式发布。 不幸的是,它并不是那么直截了当。特别是因为泄漏似乎是由这个函数触发的。必须先进行调查。 还有一个问题:您可以添加有关数据集的信息吗?体积 ?格式。行数和列数等,所以我可以尝试重现错误。 还有你的提交命令! :) 【参考方案1】:正如评论中所讨论的,该问题与考虑到您的集群配置没有很好分区的大量数据有关。这就是它溢出到磁盘上的原因。
您需要在内存方面为您的应用程序提供更多资源和/或增加数据分区。
【讨论】:
以上是关于IndexedRowMatrix().columnSimilarities() 检索到的 PySpark 相似性无法访问:INFO ExternalSorter: Thread *spilling i的主要内容,如果未能解决你的问题,请参考以下文章
如果 x['column 1] = y['column 1'] 然后用 python 或 R 中的 y['column 2'] 中的值替换该值