在 Pyspark ML 中的稀疏向量数据类型列上创建 Python 转换器
Posted
技术标签:
【中文标题】在 Pyspark ML 中的稀疏向量数据类型列上创建 Python 转换器【英文标题】:Create a Python transformer on sparsevector data type column in Pyspark ML 【发布时间】:2016-03-09 18:13:15 【问题描述】:我有一个数据框,其中有一列 'features'(数据框中的每一行代表一个文档)。我使用 HashingTF 来计算列 'tf' 并且我还创建了一个自定义转换器 'TermCount'(就像测试一样)来计算 'total_terms' em>如下:
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.feature import HashingTF
from pyspark.ml.util import keyword_only
from pyspark.mllib.linalg import SparseVector
from pyspark.sql.functions import udf
class TermCount(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(TermCount, self).__init__()
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
def f(s):
return len(s.values)
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f)(in_col))
sc = SparkContext()
sqlContext = SQLContext(sc)
documents = sqlContext.createDataFrame([
(0, "w1 w2 w3 w4 w1 w1 w1"),
(1, "w2 w3 w4 w2"),
(2, "w3 w4 w3"),
(3, "w4")], ["doc_id", "doc_text"])
df = documents.map(lambda x : (x.doc_id,x.doc_text.split(" "))).toDF().withColumnRenamed("_1","doc_id").withColumnRenamed("_2","features")
htf = HashingTF(inputCol="features", outputCol="tf")
tf = htf.transform(df)
term_count_model=TermCount(inputCol="tf", outputCol="total_terms")
tc_df=term_count_model.transform(tf)
tc_df.show(truncate=False)
#+------+----------------------------+------------------------------------------------+-----------+
#|doc_id|features |tf |total_terms|
#+------+----------------------------+------------------------------------------------+-----------+
#|0 |[w1, w2, w3, w4, w1, w1, w1]|(262144,[3738,3739,3740,3741],[4.0,1.0,1.0,1.0])|4 |
#|1 |[w2, w3, w4, w2] |(262144,[3739,3740,3741],[2.0,1.0,1.0]) |3 |
#|2 |[w3, w4, w3] |(262144,[3740,3741],[2.0,1.0]) |2 |
#|3 |[w4] |(262144,[3741],[1.0]) |1 |
#+------+----------------------------+------------------------------------------------+-----------+
现在,我需要添加一个类似的转换器,它接收 'tf' 作为 inputCol,并将每个术语 (no_of_rows_contains_this_term / total_no_of_rows) 的文档频率计算到 Sparsevector 类型的 outputCol 中,最后得到如下结果:
+------+----------------------------+------------------------------------------------+-----------+----------------------------------------------------+
|doc_id|features |tf |total_terms| doc_freq |
+------+----------------------------+------------------------------------------------+-----------+----------------------------------------------------+
|0 |[w1, w2, w3, w4, w1, w1, w1]|(262144,[3738,3739,3740,3741],[4.0,1.0,1.0,1.0])|4 |(262144,[3738,3739,3740,3741],[0.25,0.50,0.75,1.0]) |
|1 |[w2, w3, w4, w2] |(262144,[3739,3740,3741],[2.0,1.0,1.0]) |3 |(262144,[3739,3740,3741],[0.50,0.75,1.0]) |
|2 |[w3, w4, w3] |(262144,[3740,3741],[2.0,1.0]) |2 |(262144,[3740,3741],[0.75,1.0]) |
|3 |[w4] |(262144,[3741],[1.0]) |1 |(262144,[3741],[1.0]) |
+------+----------------------------+------------------------------------------------+-----------+----------------------------------------------------+
【问题讨论】:
我使用了@zero323 的Python transformaer的想法 【参考方案1】:排除所有你可以尝试使用Statistics.colStats
的包装代码:
from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors
tf_col = "x"
dataset = sc.parallelize([
"(262144,[3738,3739,3740,3741],[0.25,0.50,0.75,1.0])",
"(262144,[3738,3739,3740,3741],[0.25,0.50,0.75,1.0])"
]).map(lambda s: (Vectors.parse(s), )).toDF(["x"])
vs = (dataset.select(tf_col)
.flatMap(lambda x: x)
.map(lambda v: Vectors.sparse(v.size, v.indices, [1.0 for _ in v.values])))
stats = Statistics.colStats(vs)
document_frequency = stats.mean()
document_frequency.max()
## 1.0
document_frequency.min()
# 0.0
document_frequency.nonzero()
## (array([3738, 3739, 3740, 3741]),)
当您获得这些信息后,您可以轻松调整所需的索引:
from pyspark.mllib.linalg import VectorUDT
df = Vectors.sparse(
document_frequency.shape[0], document_frequency.nonzero()[0],
document_frequency[document_frequency.nonzero()]
)
def idf(df, d):
values = ... # Compute new values
return Vectors.sparse(v.size, v.indices, values)
dataset.withColumn("idf_col", udf(idf, VectorUDT())(col("tf_col")))
一个巨大的警告是stats.mean
返回一个DenseVector
,所以如果你的 TF 有 262144 个特征,输出是一个相同长度的数组。
【讨论】:
这里只需要一个 udf。 哦,谢谢@zero323,你真好。dataset
在调用 idf( ) 函数后没有改变。仍然包含一列而不是两列。以上是关于在 Pyspark ML 中的稀疏向量数据类型列上创建 Python 转换器的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中的多个列上应用 MinMaxScaler