Pyspark:K 表示模型拟合时的聚类误差
Posted
技术标签:
【中文标题】Pyspark:K 表示模型拟合时的聚类误差【英文标题】:Pyspark: K means clustering error at model fittting 【发布时间】:2020-09-19 05:40:23 【问题描述】:虽然运行 K 意味着使用 pyspark 进行聚类,但我正在使用以下代码行来找到最佳 K 值。但是在模型拟合线上不断弹出一些错误。
预处理阶段包括去除 NA 和标签编码,
from pyspark.sql.functions import when,col
#Encode column "Potential" from dataframe df
high = list(range(86,101))
middle = list(range (71, 86))
low = list(range(56, 71))
very_low = list(range(45,56))
#checking for NULL
df = df.where(col("Potential").isNotNull())
#encoding the column potential
df1 = df.withColumn("Potential_Grading", when(col("Potential").isin(high), "high").
when(col("Potential").isin(middle), "Middle").
when(col("Potential").isin(low), "Low").
when(col("Potential").isin(very_low),"Verylow"))
df_kmeans = df1.select("ID","Height(CM)","Weight(KG)", "Crossing", "Finishing", "HeadingAccuracy", "ShortPassing",
"Volleys", "Dribbling", "Curve", "FKAccuracy", "LongPassing", "BallControl", "Acceleration",
"SprintSpeed", "Agility", "Reactions", "Balance", "ShotPower", "Jumping", "Stamina", "Strength",
"LongShots", "Aggression", "Interceptions", "Vision", "Penalties", "Composure", "Positioning",
"Marking", "StandingTackle", "SlidingTackle","Potential_Grading")
FEATURES_COL1 = ['Height(CM)', 'Weight(KG)',
'Crossing', 'Finishing', 'HeadingAccuracy',
'ShortPassing', 'Volleys', 'Dribbling', 'Curve',
'FKAccuracy', 'LongPassing', 'BallControl',
'Acceleration', 'SprintSpeed', 'Agility',
'Reactions', 'Balance', 'ShotPower', 'Jumping',
'Stamina', 'Strength', 'LongShots', 'Aggression',
'Interceptions', 'Vision', 'Penalties','Positioning',
'Composure', 'Marking', 'StandingTackle', 'SlidingTackle']
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
#Vector assembler is a transformer which takes all of the columns specified under FEATURES_COL and combines them into a new vector column.
vecAssembler = VectorAssembler(inputCols=FEATURES_COL1, outputCol="features")
#df_kmeans_1 = vecAssembler.transform(df_kmeans).select('ID','features')
df_kmeans_1 = vecAssembler.transform(df_kmeans).select('ID','features')
拟合K表示的代码行数
cost = np.zeros(21)
from pyspark.ml.clustering import KMeans
for k in range(2,21):
# Trains a k-means model.
kmeans = KMeans().setK(k).setFeaturesCol("features").setSeed(1)
model1 = kmeans.fit(df_kmeans_1)
#Evaluate clustering by computing Within Set Sum of Squared Errors.
cost[k] = model1.computeCost(df_kmeans_1)
模型拟合代码行出现错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-15-2840daf75485> in <module>()
8 # Trains a k-means model.
9 kmeans = KMeans().setK(k).setFeaturesCol("features").setSeed(1)
---> 10 model1 = kmeans.fit(df_kmeans_1)
11
12 #Evaluate clustering by computing Within Set Sum of Squared Errors.
5 frames
/content/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o346.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 18, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<Height(CM):double,Weight(KG):double,Crossing_double_VectorAssembler_f596de6f26ae:double,Finishing_double_VectorAssembler_f596de6f26ae:double,HeadingAccuracy_double_VectorAssembler_f596de6f26ae:double,ShortPassing_double_VectorAssembler_f596de6f26ae:double,Volleys_double_VectorAssembler_f596de6f26ae:double,Dribbling_double_VectorAssembler_f596de6f26ae:double,Curve_double_VectorAssembler_f596de6f26ae:double,FKAccuracy_double_VectorAssembler_f596de6f26ae:double,LongPassing_double_VectorAssembler_f596de6f26ae:double,BallControl_double_VectorAssembler_f596de6f26ae:double,Acceleration_double_VectorAssembler_f596de6f26ae:double,SprintSpeed_double_VectorAssembler_f596de6f26ae:double,Agility_double_VectorAssembler_f596de6f26ae:double,Reactions_double_VectorAssembler_f596de6f26ae:double,Balance_double_VectorAssembler_f596de6f26ae:double,ShotPower_double_VectorAssembler_f596de6f26ae:double,Jumping_double_VectorAssembler_f596de6f26ae:double,Stamina_double_VectorAssembler_f596de6f26ae:double,Strength_double_VectorAssembler_f596de6f26ae:double,LongShots_double_VectorAssembler_f596de6f26ae:double,Aggression_double_VectorAssembler_f596de6f26ae:double,Interceptions_double_VectorAssembler_f596de6f26ae:double,V...
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:220)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
... 29 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:572)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.takeSample(RDD.scala:561)
at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:386)
at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:282)
at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:251)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:362)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:340)
at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:340)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<Height(CM):double,Weight(KG):double,Crossing_double_VectorAssembler_f596de6f26ae:double,Finishing_double_VectorAssembler_f596de6f26ae:double,HeadingAccuracy_double_VectorAssembler_f596de6f26ae:double,ShortPassing_double_VectorAssembler_f596de6f26ae:double,Volleys_double_VectorAssembler_f596de6f26ae:double,Dribbling_double_VectorAssembler_f596de6f26ae:double,Curve_double_VectorAssembler_f596de6f26ae:double,FKAccuracy_double_VectorAssembler_f596de6f26ae:double,LongPassing_double_VectorAssembler_f596de6f26ae:double,BallControl_double_VectorAssembler_f596de6f26ae:double,Acceleration_double_VectorAssembler_f596de6f26ae:double,SprintSpeed_double_VectorAssembler_f596de6f26ae:double,Agility_double_VectorAssembler_f596de6f26ae:double,Reactions_double_VectorAssembler_f596de6f26ae:double,Balance_double_VectorAssembler_f596de6f26ae:double,ShotPower_double_VectorAssembler_f596de6f26ae:double,Jumping_double_VectorAssembler_f596de6f26ae:double,Stamina_double_VectorAssembler_f596de6f26ae:double,Strength_double_VectorAssembler_f596de6f26ae:double,LongShots_double_VectorAssembler_f596de6f26ae:double,Aggression_double_VectorAssembler_f596de6f26ae:double,Interceptions_double_VectorAssembler_f596de6f26ae:double,Vision_double_VectorAssembler_f596de6f26ae:double,Penalties_double_VectorAssembler_f596de6f26ae:double,Positioning_double_VectorAssembler_f596de6f26ae:double,Composure_double_VectorAs...
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:220)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
... 29 more
谁能帮我找出错误。
【问题讨论】:
【参考方案1】:好问题!
在VectorAssembler
函数中添加参数handleInvalid
,跳过缺失值的行:
vecAssembler = VectorAssembler(
inputCols=FEATURES_COL1
, outputCol="features"
, handleInvalid="skip" # skip the rows with missing values
)
【讨论】:
【参考方案2】:来自您的日志:
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
您在组装行中有null
值。
vecAssembler = VectorAssembler(inputCols=FEATURES_COL1, outputCol="features")
请先在数据中填写空值,或使用向量汇编器的handleInvalid
参数,详见Official Doc。
【讨论】:
对于其他仍然收到此错误的人来说,这是因为我在VectorAssembler()
+ KMeans
之前使用了 StandardScaler()
。出于某种原因,KMeans
对我缩放值、转换为浮点数和VectorAssembler()
感到不高兴。我必须在VectorAssembler.
之前将我的缩放值保留为vector
类型,是的,即使VectorAssembler
已经将数据转换为向量。我必须做 vector
数据类型 -> VectorAssembler
= vector
功能列。以上是关于Pyspark:K 表示模型拟合时的聚类误差的主要内容,如果未能解决你的问题,请参考以下文章