将 RDD 转换为 kmeans 的有效输入

Posted

技术标签:

【中文标题】将 RDD 转换为 kmeans 的有效输入【英文标题】:Transform RDD to valid input for kmeans 【发布时间】:2018-03-31 17:49:33 【问题描述】:

我正在使用包含 csv 文件的目录的 spark mllib 算法计算 TF 和 IDF,代码如下:

import argparse
from os import system

### args parsing
parser = argparse.ArgumentParser(description='runs TF/IDF on a directory of 
text docs')
parser.add_argument("-i","--input", help="the input in HDFS",  
required=True)
parser.add_argument("-o", '--output', help="the output in  HDFS", 
required=True )
parser.add_argument("-mdf", '--min_document_frequency', default=1 )
args = parser.parse_args()


docs_dir = args.input
d_out = "hdfs://master:54310/" + args.output
min_df = int(args.min_document_frequency)

# import spark-realated stuff
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF

sc = SparkContext(appName="TF-IDF")

# Load documents (one per line).
documents = sc.textFile(docs_dir).map(lambda title_text: 
title_text[1].split(" "))

hashingTF = HashingTF()
tf = hashingTF.transform(documents)

# IDF
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

#print(tfidf.collect())

#save
tfidf.saveAsTextFile(d_out)

使用

print(tfidf.collect())

我得到这个输出:

[SparseVector(1048576, 812399: 4.3307), SparseVector(1048576, 411697: 
0.0066), SparseVector(1048576, 411697: 0.0066), SparseVector(1048576, 
411697: 0.0066), SparseVector(1048576, 411697: 0.0066), ....

我还测试了 KMeans mllib 算法:

from __future__ import print_function

import sys

import numpy as np
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans

runs=4

def parseVector(line):
return np.array([float(x) for x in line.split(' ')])

if __name__ == "__main__":
if len(sys.argv) != 3:
    print("Usage: kmeans <file> <k>", file=sys.stderr)
    exit(-1)
sc = SparkContext(appName="KMeans")
lines = sc.textFile(sys.argv[1])
data = lines.map(parseVector)
k = int(sys.argv[2])
model = KMeans.train(data, k, runs)
print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))
sc.stop()

使用这个示例测试用例

0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2

而且效果很好。

现在我想在 KMeans 算法中应用上面 tfidf 的 rdd 输出,但我不知道如何像上面的示例文本一样转换 rdd,或者如何在 KMeans 算法中正确拆分 rdd 到正常工作。

我真的需要一些帮助。

更新

我真正的问题是如何从这样的文本文件中读取输入以将其应用于 KMeans mllib

(1048576,[155412,857472,756332],[1.75642010278,2.41857747478,1.97365255252])
(1048576,[159196,323305,501636],[2.98856378408,1.63863706713,2.44956728334])
(1048576,[135312,847543,743411],[1.42412015238,1.58759872958,2.01237484818])

更新2

我完全不确定,但我认为我需要从上面的向量转到下面的数组,以便将它直接应用于 KMeans mllib 算法

1.75642010278 2.41857747478 1.97365255252
2.98856378408 1.63863706713 2.44956728334
1.42412015238 1.58759872958 2.01237484818

【问题讨论】:

【参考方案1】:

IDF 的输出是SparseVector 的数据框。 KMeans 将向量作为输入(稀疏或密集),因此,不需要进行任何转换。您应该能够直接使用来自 IDF 的输出列作为 KMeans 的输入。

如果您需要在运行 TFIDF 和 KMeans 之间将数据保存到磁盘,我建议您通过数据帧 API 将其保存为 csv。

首先使用Row转换为数据框:

from pyspark.sql import Row

row = Row("features")    # column name
df = tfidf.map(row).toDF()

另一种无需导入即可转换的方法:

df = tfidf.map(lambda x: (x, )).toDF(["features"])

转换后将数据框保存为 parquet 文件:

df.write.parquet('/path/to/save/file')

要读取数据,只需使用:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.parquet('/path/to/file')

# converting from dataframe into an RDD[Vector]
data = df.rdd.map(list)

如果您在任何情况下都需要从保存为字符串的向量进行转换,这也是可能的。下面是一些示例代码:

from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

df = sc.parallelize(["(7,[1,2,4],[1,1,1])"]).toDF(["features"])

parse = udf(lambda s: Vectors.parse(s), VectorUDT())
df.select(parse("features"))

首先创建一个具有相同格式的示例数据框。然后使用UDF 将字符串解析为向量。如果您想要一个 rdd 而不是数据框,请使用上面“从镶木地板读取”部分的代码进行转换。


但是,IDF 的输出非常稀疏。这些向量的长度为 1048576,其中只有一个的值大于 1。KMeans 不会给您任何有趣的结果。

我建议您改为查看word2vec。它将为每个单词提供一个更紧凑的向量,并且对这些向量进行聚类会更有意义。使用这种方法,您可以接收到单词到它们的向量表示的映射,该映射可用于聚类。

【讨论】:

以上是关于将 RDD 转换为 kmeans 的有效输入的主要内容,如果未能解决你的问题,请参考以下文章

将地图 RDD 转换为数据框

数据帧到 RDD[Row] 用空值替换空间

Spark算子:RDD基本转换操作–mapflatMapdistinct

如何将三个 RDD 加入一个元组?

将 OpenCV cv::kmeans() 与一维输入一起使用

RDD之三:RDD创建方式