Pyspark:K 表示模型拟合时的聚类误差

Posted

技术标签:

【中文标题】Pyspark:K 表示模型拟合时的聚类误差【英文标题】:Pyspark: K means clustering error at model fittting 【发布时间】:2020-09-19 05:40:23 【问题描述】:

虽然运行 K 意味着使用 pyspark 进行聚类,但我正在使用以下代码行来找到最佳 K 值。但是在模型拟合线上不断弹出一些错误。

预处理阶段包括去除 NA 和标签编码,

from pyspark.sql.functions import when,col

#Encode column "Potential" from dataframe df
high = list(range(86,101))
middle = list(range (71, 86))
low = list(range(56, 71))
very_low = list(range(45,56))


#checking for NULL
df = df.where(col("Potential").isNotNull())

#encoding the column potential
df1 = df.withColumn("Potential_Grading", when(col("Potential").isin(high), "high").
                    when(col("Potential").isin(middle), "Middle").
                    when(col("Potential").isin(low), "Low").
                    when(col("Potential").isin(very_low),"Verylow"))
df_kmeans = df1.select("ID","Height(CM)","Weight(KG)", "Crossing", "Finishing", "HeadingAccuracy", "ShortPassing", 
          "Volleys", "Dribbling", "Curve", "FKAccuracy", "LongPassing", "BallControl", "Acceleration", 
          "SprintSpeed", "Agility", "Reactions", "Balance", "ShotPower", "Jumping", "Stamina", "Strength", 
          "LongShots", "Aggression", "Interceptions", "Vision", "Penalties", "Composure", "Positioning",
          "Marking", "StandingTackle", "SlidingTackle","Potential_Grading")

FEATURES_COL1 = ['Height(CM)', 'Weight(KG)', 
                      'Crossing', 'Finishing', 'HeadingAccuracy', 
                      'ShortPassing', 'Volleys', 'Dribbling', 'Curve',
                      'FKAccuracy', 'LongPassing', 'BallControl', 
                      'Acceleration', 'SprintSpeed', 'Agility', 
                      'Reactions', 'Balance', 'ShotPower', 'Jumping', 
                      'Stamina', 'Strength', 'LongShots', 'Aggression',
                      'Interceptions', 'Vision', 'Penalties','Positioning',
                      'Composure', 'Marking', 'StandingTackle', 'SlidingTackle']

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler


#Vector assembler is a transformer which takes all of the columns specified under FEATURES_COL and combines them into a new vector column.
vecAssembler = VectorAssembler(inputCols=FEATURES_COL1, outputCol="features")

#df_kmeans_1 = vecAssembler.transform(df_kmeans).select('ID','features')
df_kmeans_1 = vecAssembler.transform(df_kmeans).select('ID','features')

拟合K表示的代码行数

cost = np.zeros(21)

    from pyspark.ml.clustering import KMeans

for k in range(2,21):

      # Trains a k-means model.
      kmeans = KMeans().setK(k).setFeaturesCol("features").setSeed(1)
      model1 = kmeans.fit(df_kmeans_1)  

      #Evaluate clustering by computing Within Set Sum of Squared Errors.
      cost[k] = model1.computeCost(df_kmeans_1)

模型拟合代码行出现错误:

    ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-15-2840daf75485> in <module>()
      8   # Trains a k-means model.
      9   kmeans = KMeans().setK(k).setFeaturesCol("features").setSeed(1)
---> 10   model1 = kmeans.fit(df_kmeans_1)
     11 
     12   #Evaluate clustering by computing Within Set Sum of Squared Errors.

5 frames
/content/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling 012.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o346.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 18, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<Height(CM):double,Weight(KG):double,Crossing_double_VectorAssembler_f596de6f26ae:double,Finishing_double_VectorAssembler_f596de6f26ae:double,HeadingAccuracy_double_VectorAssembler_f596de6f26ae:double,ShortPassing_double_VectorAssembler_f596de6f26ae:double,Volleys_double_VectorAssembler_f596de6f26ae:double,Dribbling_double_VectorAssembler_f596de6f26ae:double,Curve_double_VectorAssembler_f596de6f26ae:double,FKAccuracy_double_VectorAssembler_f596de6f26ae:double,LongPassing_double_VectorAssembler_f596de6f26ae:double,BallControl_double_VectorAssembler_f596de6f26ae:double,Acceleration_double_VectorAssembler_f596de6f26ae:double,SprintSpeed_double_VectorAssembler_f596de6f26ae:double,Agility_double_VectorAssembler_f596de6f26ae:double,Reactions_double_VectorAssembler_f596de6f26ae:double,Balance_double_VectorAssembler_f596de6f26ae:double,ShotPower_double_VectorAssembler_f596de6f26ae:double,Jumping_double_VectorAssembler_f596de6f26ae:double,Stamina_double_VectorAssembler_f596de6f26ae:double,Strength_double_VectorAssembler_f596de6f26ae:double,LongShots_double_VectorAssembler_f596de6f26ae:double,Aggression_double_VectorAssembler_f596de6f26ae:double,Interceptions_double_VectorAssembler_f596de6f26ae:double,V...
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:220)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
    ... 29 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
    at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:572)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.takeSample(RDD.scala:561)
    at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:386)
    at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:282)
    at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:251)
    at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:362)
    at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:340)
    at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
    at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:340)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<Height(CM):double,Weight(KG):double,Crossing_double_VectorAssembler_f596de6f26ae:double,Finishing_double_VectorAssembler_f596de6f26ae:double,HeadingAccuracy_double_VectorAssembler_f596de6f26ae:double,ShortPassing_double_VectorAssembler_f596de6f26ae:double,Volleys_double_VectorAssembler_f596de6f26ae:double,Dribbling_double_VectorAssembler_f596de6f26ae:double,Curve_double_VectorAssembler_f596de6f26ae:double,FKAccuracy_double_VectorAssembler_f596de6f26ae:double,LongPassing_double_VectorAssembler_f596de6f26ae:double,BallControl_double_VectorAssembler_f596de6f26ae:double,Acceleration_double_VectorAssembler_f596de6f26ae:double,SprintSpeed_double_VectorAssembler_f596de6f26ae:double,Agility_double_VectorAssembler_f596de6f26ae:double,Reactions_double_VectorAssembler_f596de6f26ae:double,Balance_double_VectorAssembler_f596de6f26ae:double,ShotPower_double_VectorAssembler_f596de6f26ae:double,Jumping_double_VectorAssembler_f596de6f26ae:double,Stamina_double_VectorAssembler_f596de6f26ae:double,Strength_double_VectorAssembler_f596de6f26ae:double,LongShots_double_VectorAssembler_f596de6f26ae:double,Aggression_double_VectorAssembler_f596de6f26ae:double,Interceptions_double_VectorAssembler_f596de6f26ae:double,Vision_double_VectorAssembler_f596de6f26ae:double,Penalties_double_VectorAssembler_f596de6f26ae:double,Positioning_double_VectorAssembler_f596de6f26ae:double,Composure_double_VectorAs...
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:220)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
    ... 29 more

谁能帮我找出错误。

【问题讨论】:

【参考方案1】:

好问题!

VectorAssembler函数中添加参数handleInvalid,跳过缺失值的行:

vecAssembler = VectorAssembler(
    inputCols=FEATURES_COL1
    , outputCol="features"
    , handleInvalid="skip" # skip the rows with missing values
)

【讨论】:

【参考方案2】:

来自您的日志:

Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".

您在组装行中有null 值。 vecAssembler = VectorAssembler(inputCols=FEATURES_COL1, outputCol="features")

请先在数据中填写空值,或使用向量汇编器的handleInvalid参数,详见Official Doc。

【讨论】:

对于其他仍然收到此错误的人来说,这是因为我在 VectorAssembler() + KMeans 之前使用了 StandardScaler()。出于某种原因,KMeans 对我缩放值、转换为浮点数和VectorAssembler() 感到不高兴。我必须在VectorAssembler. 之前将我的缩放值保留为vector 类型,是的,即使VectorAssembler 已经将数据转换为向量。我必须做 vector 数据类型 -> VectorAssembler = vector 功能列。

以上是关于Pyspark:K 表示模型拟合时的聚类误差的主要内容,如果未能解决你的问题,请参考以下文章

正则化(神经网络过拟合时的应对方法)

用matlab进行曲线拟合时,如何判断拟合的好坏

初探DBSCAN聚类算法

性能评价

聚类:层次聚类基于划分的聚类(k-means)基于密度的聚类基于模型的聚类

计算 pyspark 数据框中的聚类成本