PySpark - 如何根据 CoordinateMatrix 中表示的相似点获取前 k 个 ID?

Posted

技术标签:

【中文标题】PySpark - 如何根据 CoordinateMatrix 中表示的相似点获取前 k 个 ID?【英文标题】:PySpark - How to obtain top-k ids based on their similarites represented in a CoordinateMatrix? 【发布时间】:2018-01-08 09:14:21 【问题描述】:

我有一个数据字典(键代表项目(1,2,3..是项目的 ID),它们的值('712907','742068')指的是用户)。我将其转换为熊猫数据框:

data_dict = 0: ['712907','742068','326136','667386'],
             1: ['667386','742068','742068'],
             2: ['326136', '663056', '742068','742068'],
            3: ['326136', '663056', '742068'],4: ['326116','742068','663056', '742068'],5: ['326136','326136','663056', '742068']
df= pd.DataFrame.from_dict(data_dict, orient='index')

我根据用户('712907'、'742068'、'326136'..)对数据框中的项目进行分组,见下图。

dframe = pd.get_dummies(df.stack()).sum(level=0)
sv = sparse.csr_matrix(dframe.as_matrix())

请注意,上面的数据帧(dframe)只是一个小例子,实际的 dframe 大小是(309235 x 81566)。因此,我想使用 spark 来计算 sv (稀疏矩阵)中行(1,2,3...)之间的余弦相似度。 这是我到目前为止所取得的成就:

from pyspark.sql import SQLContext
from pyspark.sql.types import Row
sc = pyspark.SparkContext(appName="cosinesim")
sqlContext = SQLContext(sc)
sv_rdd = sc.parallelize(sv.toarray())

使用example,我将 rdd 转换为数据框:

def f(x):
    d = 
    for i in range(len(x)):
        d[str(i)] = int(x[i])
    return d
dfspark = sv_rdd.map(lambda x: Row(**f(x))).toDF()

在example 之后,我添加了一个新的“id”列:

row_with_index = Row(*["id"] + dfspark.columns)

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(dfspark.columns)

dfidx = (dfspark.rdd
    .zipWithIndex()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + dfspark.schema.fields)))

最后,通过转置矩阵计算行之间的相似度:

pred = IndexedRowMatrix(dfidx.rdd.map(lambda row: IndexedRow(row.id,row[1:])))
pred1 = pred.toBlockMatrix().transpose().toIndexedRowMatrix()
pred_sims = pred1.columnSimilarities()

如何根据余弦相似度(pred_sims)获得每个项目 0、1、2、3、4 的前 k 个 ID? 我将 CoordinateMatrix 转换为数据框,但不确定如何访问每个 id 的前 k 个项目..

columns = ['from', 'to', 'sim']
vals = pred_sims.entries.map(lambda e: (e.i, e.j, e.value)).collect()
dfsim = sqlContext.createDataFrame(vals, columns)
dfsim.show()

from pyspark.sql.functions import col, desc
for i in range(m):
    target_id = int(dataset_u[i])
    dfFrom = dfsim.where((col("from") == target_id))
.....

【问题讨论】:

如果您尝试解决的问题实际上只是选择每个 id 的前 k 个项目,那么问题的第一部分似乎无关紧要 - 您可以从 dfsim 开始并从那里提问吗?还是我错过了什么?... 第一部分只是展示了我如何生成一个大型稀疏矩阵(csr_matrix),然后将其转换为 rdd 以计算行之间的余弦相似度。 'pred_sims' 是坐标矩阵的上三角形。我想根据它们的相似性获得每个项目的前 k 个项目.. 【参考方案1】:

您可以使用窗口函数按每个项目的相似度排序,然后使用 row_count():

from pyspark.sql.window import Window

window = Window.partitionBy(dfsim['from']).orderBy(dfsim['sim'].desc())

dfsim.select('*', func.row_number().over(window).alias('row_number')) \
  .filter(func.col('row_number') <= 3) \
  .show()
+----+---+------------------+----------+
|from| to|               sim|row_number|
+----+---+------------------+----------+
|   0|  1|0.6708203932499369|         1|
|   0|  5|0.6123724356957946|         2|
|   4|  5|0.5000000000000001|         1|
|   1|  4|0.7302967433402215|         1|
|   1|  2|0.7302967433402215|         2|
|   2|  3|0.9428090415820636|         1|
|   2|  4|0.8333333333333336|         2|
|   3|  5|0.9428090415820636|         1|
|   3|  4|0.7071067811865477|         2|
+----+---+------------------+----------+

如果您需要将行选择与原始数据框相关联,请加入原始数据。

【讨论】:

以上是关于PySpark - 如何根据 CoordinateMatrix 中表示的相似点获取前 k 个 ID?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何根据其他行值的值更改行+列的值

如何根据行的内容拆分pyspark数据框

如何根据 PySpark 中窗口聚合的条件计算不同值?

pyspark如何根据值添加选定的列

如何根据条件在pyspark中跨连续行保留值

如何根据pyspark中的条件组合dataFrame中的行