在pyspark中使用基于DataFrame的API在2个sparseVectors列表之间进行矩阵乘法的最佳方法是啥?

Posted

技术标签:

【中文标题】在pyspark中使用基于DataFrame的API在2个sparseVectors列表之间进行矩阵乘法的最佳方法是啥?【英文标题】:What's the best way to do matrix multiplication between 2 lists of sparseVectors with DataFrame-based API in pyspark?在pyspark中使用基于DataFrame的API在2个sparseVectors列表之间进行矩阵乘法的最佳方法是什么? 【发布时间】:2017-10-11 05:31:50 【问题描述】:

我有 2 个具有相同结构的 DataFrames:DataFrame[id: bigint, tfidf_features: vector]

我需要dataframe1 中的多行与dataframe2 中的行。我可以使用循环并执行以下操作: dataframe1.collect()[i]['tfidf_features'].dot(dataframe2.collect()[j]['tfidf_features']).

但是,我想使用矩阵乘法,相当于:np.matmul(dataframe1_tfidf_features, dataframe2_tfidf_features.T)

【问题讨论】:

【参考方案1】:

你有两个选择 1.mllib.linalg.distributed.BlockMatrix 将两个数据帧都转换为块矩阵并使用 multitply

bm1 = IndexedRowMatrix(df1.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
bm2 = IndexedRowMatrix(df2.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
bm_result = bm1.multiply(bm2)  

2。 pyspark.sql.dataframe.crossJoin 交叉连接两个数据帧并计算结果矩阵的单个元素,然后使用 collect_list & sort

arr = np.array
df =df1.crossJoin(df2.select(col("id").alias("id2"),
                                 col("features").alias("features2"))

udf_mult = udf(lambda x,y = float(arr(x).dot(arr(y).T).sum()), DoubleType()) 
df = df.withColumn("val", udf_mult("features","features2")).
                         drop("features","features2")
st = struct(["id2","val"]).alias("map")
df = df.select("id", st).groupBy("id").agg(collect_list("map").alias("list"))

def sort(x):

    x = sorted(x, key=lambda x:x[0])
    y = list(map(lambda a:a[1], x))
    return(y)
udf_sort = udf(sort, ArrayType(DoubleType()))
df = df.withColumn("list", udf_sort("list"))

【讨论】:

以上是关于在pyspark中使用基于DataFrame的API在2个sparseVectors列表之间进行矩阵乘法的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

基于pyspark中仅一列的两个DataFrame之间的差异[重复]

pyspark dataframe数据连接(join)转化为pandas dataframe基于多个字段删除冗余数据

合并多个 PySpark DataFrame 行以将基于事件的数据转换为基于人员的数据

PySpark - 如何使用连接更新 Dataframe?

PySpark Dataframe 将两列转换为基于第三列值的元组新列

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame