将 pyspark 数据帧转换为 pandas 会抛出 org.apache.spark.SparkException: Unseen label: null [重复]
Posted
技术标签:
【中文标题】将 pyspark 数据帧转换为 pandas 会抛出 org.apache.spark.SparkException: Unseen label: null [重复]【英文标题】:Converting pyspark dataframe to pandas throws org.apache.spark.SparkException: Unseen label: null [duplicate] 【发布时间】:2019-01-28 20:36:10 【问题描述】:我正在使用 pyspark 随机森林分类器,并希望在我得到它们后从预测中创建一个 pandas 数据框。当我尝试这样做时,会发生最奇怪的异常。这是我的代码:
random_forest = RandomForestClassifier(labelCol = 'label', featuresCol = 'features', maxDepth = 4, impurity = 'entropy', numTrees = 10, maxBins = 250)
rf_model = random_forest.fit(training_data)
predictions = rf_model.transform(test_data)
# Where exception happens
df = predictions.select('rawPrediction', 'label', 'prediction').where((predictions.label == '1.0') & (predictions.prediction == '0.0')).toPandas()
# The code that works fine
label_pred_train = predictions.select('label', 'prediction')
print label_pred_train.rdd.zipWithIndex().countByKey()
当我尝试过滤预测并选择其中的一个子集以转换为 pandas 数据帧时,就会出现问题。当我将toPandas
替换为count
、collect
等时,也会发生同样的异常。最让我惊讶的是,当我删除它并执行以下行时,我使用 rdd 来计算一切正常,并且它返回结果。我已经阅读了几篇关于StringIndexer
的问题以及我如何使用handleInvalid = 'keep'
的帖子,但不幸的是我正在使用spark 2.1 运行它,老实说我认为它与StringIndexer
没有任何关系,因为我' m 能够进行拟合、转换并从模型中获得预测。有什么我可能在这里遗漏的吗?
这里是完整的例外:
py4j.protocol.Py4JJavaError: An error occurred while calling o1040.collectToPython.
: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (string) => double)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:409)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:116)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:125)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:125)
at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
at scala.collection.immutable.List.exists(List.scala:84)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:125)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:140)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:138)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:138)
at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:105)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$2.apply(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$2.apply(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:230)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2742)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Unseen label: null.
at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:170)
at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:166)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:89)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:88)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1069)
... 75 more
【问题讨论】:
“不要认为它与StringIndexer
有任何关系,因为我能够从模型中进行拟合、转换和预测” - 这不是真的. Spark is lazy.
@pault 好吧,我同意你的看法,但正如我在问题中提到的那样,countByKey
怎么能正常工作而收集却不行?他们基本上正在检查所有样本,我假设如果数据有任何问题,当我使用 rdd 时也会发生同样的情况。
我同意@pault - 异常是由StringIndexer
引发的,并且在您包含的堆栈跟踪中清楚地表明了这一点。您甚至可以找到确切的行 - StringIndexer.scala:170
【参考方案1】:
使用这个
df = predictions.select('rawPrediction', 'label', 'prediction').where((predictions.label == 1.0 & (predictions.prediction == 0.0)).toPandas()
而不是
df = predictions.select('rawPrediction', 'label', 'prediction').where((predictions.label == '1.0') & (predictions.prediction == '0.0')).toPandas()
而不是使用 '1.0' 使用 1.0 ,很可能它正在寻找整数类型的字符串类型,并且它无法找到它为其分配 null 并制作错误看不见的标签我猜。
【讨论】:
以上是关于将 pyspark 数据帧转换为 pandas 会抛出 org.apache.spark.SparkException: Unseen label: null [重复]的主要内容,如果未能解决你的问题,请参考以下文章
在 Pyspark 中将 Pandas 数据帧转换为 Spark 数据帧的 TypeError
通过 pyspark.sql.dataframe 将 XML 数据转换为 pandas 数据帧
将 pyspark 数据帧转换为 pandas 会抛出 org.apache.spark.SparkException: Unseen label: null [重复]