python - 如何将密集向量的RDD转换为pyspark中的DataFrame?
Posted
技术标签:
【中文标题】python - 如何将密集向量的RDD转换为pyspark中的DataFrame?【英文标题】:How to convert RDD of dense vector into DataFrame in pyspark? 【发布时间】:2016-12-26 09:05:26 【问题描述】:我有一个像这样的DenseVector
RDD
>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]
我想把它转换成Dataframe
。我试过这样
>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()
它给出了这样的错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
schema = _infer_schema(first)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>
旧解决方案
frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))
编辑 1 - 代码可重现
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')
sentenceData = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()
vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
【问题讨论】:
【参考方案1】:您不能直接转换RDD[Vector]
。它应该映射到可以解释为structs
的对象的RDD
,例如RDD[Tuple[Vector]]
:
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
否则 Spark 将尝试转换对象 __dict__
并创建使用不受支持的 NumPy 数组作为字段。
from pyspark.ml.linalg import DenseVector
from pyspark.sql.types import _infer_schema
v = DenseVector([1, 2, 3])
_infer_schema(v)
TypeError Traceback (most recent call last)
...
TypeError: not supported type: <class 'numpy.ndarray'>
对比
_infer_schema((v, ))
StructType(List(StructField(_1,VectorUDT,true)))
注意事项:
在 Spark 2.0 中,您必须使用正确的本地类型:
pyspark.ml.linalg
工作时 DataFrame
基于 pyspark.ml
API。
pyspark.mllib.linalg
工作时 RDD
基于 pyspark.mllib
API。
这两个命名空间不再兼容,需要显式转换(例如How to convert from org.apache.spark.mllib.linalg.VectorUDT to ml.linalg.VectorUDT)。
编辑中提供的代码与原始问题中的代码不同。您应该知道tuple
和list
的语义不同。如果你将向量映射成对使用tuple
并直接转换为DataFrame
:
tfidf.rdd.map(
lambda row: (row[0], DenseVector(row[1].toArray()))
).toDF()
使用tuple
(产品类型)也适用于嵌套结构,但我怀疑这是您想要的:
(tfidf.rdd
.map(lambda row: (row[0], DenseVector(row[1].toArray())))
.map(lambda x: (x, ))
.toDF())
list
在***row
之外的任何其他位置都被解释为ArrayType
。
使用 UDF 进行转换更简洁 (Spark Python: Standard scaler error "Do not support ... SparseVector")。
【讨论】:
【参考方案2】:我认为这里的问题是 createDataframe 不以denseVactor 作为参数请尝试将denseVector 转换为相应的集合[即数组或列表]。在 scala 和 java 中
toArray()
方法可用,您可以将denseVector转换为数组或列表,然后尝试创建dataFrame。
【讨论】:
以上是关于python - 如何将密集向量的RDD转换为pyspark中的DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章