如何将类型 <class 'pyspark.sql.types.Row'> 转换为 Vector

Posted

技术标签:

【中文标题】如何将类型 <class \'pyspark.sql.types.Row\'> 转换为 Vector【英文标题】:How to convert type <class 'pyspark.sql.types.Row'> into Vector如何将类型 <class 'pyspark.sql.types.Row'> 转换为 Vector 【发布时间】:2017-07-21 15:58:00 【问题描述】:

我是 Spark 的新手,目前我正在尝试使用 Python 编写一个简单的代码,对一组数据执行 KMeans。

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import re
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg import SparseVector
from numpy import array
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

import pandas as pd
import numpy
df = pd.read_csv("/<path>/Wholesale_customers_data.csv")
sql_sc = SQLContext(sc)
cols = ["Channel", "Region", "Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"]
s_df = sql_sc.createDataFrame(df)
vectorAss = VectorAssembler(inputCols=cols, outputCol="feature")
vdf = vectorAss.transform(s_df)
km = KMeans.train(vdf, k=2, maxIterations=10, runs=10, initializationMode="k-means||")
model = kmeans.fit(vdf)
cluster = model.clusterCenters()
print(cluster)

我将这些输入到 pyspark shell 中,当它运行 model = kmeans.fit(vdf) 时,出现以下错误:

TypeError:无法将类型转换为向量

在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 在 org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) 在 org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 在 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:27​​7) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) 在 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:27​​5) 在 org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:27​​7) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:27​​7) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 17/02/26 23:31:58 错误 执行者:阶段 23.0 (TID 113) 中的任务 6.0 中的异常 org.apache.spark.api.python.PythonException:回溯(最近 最后调用):文件 "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", 第 111 行,在 main process() 文件中 "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", 第 106 行,正在处理 serializer.dump_stream(func(split_index, 迭代器),输出文件)文件 "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", 第 263 行,在 dump_stream vs = list(itertools.islice(iterator, batch)) 文件 “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/init.py”,第 77 行,在 _convert_to_vector raise TypeError("Cannot convert type %s into Vector" % type(l)) TypeError: Cannot convert type into Vector The

我得到的数据来自:https://archive.ics.uci.edu/ml/machine-learning-databases/00292/Wholesale%20customers%20data.csv

有人可以告诉我这里出了什么问题以及我错过了什么吗?感谢您的帮助。

谢谢!

更新: @加伦 我得到的错误是:

我得到的错误是:>>> kmm = kmeans.fit(s_df)17/03/02 21:58:01 INFO BlockManagerInfo:删除了 localhost:56193 上的 broadcast_1_piece0 内存(大小:5.8 KB,免费:511.1 MB)17/03/02 21:58:01 INFO ContextCleaner:清洁的蓄能器 5 17/03/02 21:58:01 INFO BlockManagerInfo:删除了 localhost:56193 上的 broadcast_0_piece0 内存(大小:5.8 KB,免费:511.1 MB)17/03/02 21:58:01 INFO ContextCleaner: 清理累加器 4

Traceback(最近一次调用最后一次):文件“”,第 1 行,in 文件 “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/pipeline.py”,第 69 行, 合身 返回 self._fit(dataset) 文件“/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py”,第 133 行, 在 _fit java_model = self._fit_java(dataset) 文件“/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py”,第 130 行, 在 _fit_java 返回self._java_obj.fit(dataset._jdf)文件“/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, 第 813 行,在 call 文件中 “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py”,第 51 行,在 装饰 raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot resolve 'features' given 输入列:[Channel, Grocery, Fresh, Frozen, Detergents_Paper, 地区、熟食店、牛奶];"

【问题讨论】:

您在哪一行出现错误? 嗨 Vivek,这条线是:model = kmeans.fit(vdf) 【参考方案1】:

在 [即将弃用] spark mllib 包上独占使用 Spark 2.x ML 包:

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
df = spark.read.option("inferSchema", "true").option("header", "true").csv("whole_customers_data.csv")
cols = df.columns
vectorAss = VectorAssembler(inputCols=cols, outputCol="features")
vdf = vectorAss.transform(df)
kmeans = KMeans(k=2, maxIter=10, seed=1)
kmm = kmeans.fit(vdf)
kmm.clusterCenters()

【讨论】:

嗨,Garren,你能分享你的代码吗?我运行了代码并得到了错误。感谢您的帮助! @hpnhxxwn 我将在我更新的答案中分享代码。也请发布您的错误,以便其他人也可以从中学习。 @hpnhxxwn 很高兴它对你有用!请将答案标记为已接受。我怀疑某些麻烦可能来自尝试将 Spark 2.x ML 与旧的 spark mllib 混合使用。我看到您的更新说缺少“功能”,这似乎至少部分是因为您使用了没有“s”的outputCol = "feature"

以上是关于如何将类型 <class 'pyspark.sql.types.Row'> 转换为 Vector的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 错误:TypeError:不能将类型 <type 'NoneType'> 视为向量 [重复]

如何使用 PySpark 将 JSON 列类型写入 Postgres?

PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?

如何将 PySpark Dataframe 列的类型指定为 JSON

Pyspark 从 JSON 文件中获取 Schema

如何在pyspark withcolumn中使用udf和class