pyspark:打印数据框时,为什么看不到实际值?我刚得到DataFrame [avg(Rate):double]
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark:打印数据框时,为什么看不到实际值?我刚得到DataFrame [avg(Rate):double]相关的知识,希望对你有一定的参考价值。
我正在尝试返回一列的平均值。当我打印结果时,看不到实际值。
我的数据框(在以下代码之前):
+-------+------------+--------+------------------+
|Private|Applications|Accepted| Rate|
+-------+------------+--------+------------------+
| Yes| 417| 349|0.8369304556354916|
| Yes| 1899| 1720|0.9057398630858347|
| Yes| 1732| 1425|0.8227482678983834|
| Yes| 494| 313|0.6336032388663968|
| No| 3540| 2001|0.5652542372881356|
| No| 7313| 4664|0.6377683577191303|
| Yes| 619| 516|0.8336025848142165|
| Yes| 662| 513|0.7749244712990937|
| Yes| 761| 725|0.9526938239159002|
| Yes| 1690| 1366| 0.808284023668639|
| Yes| 6075| 5349|0.8804938271604938|
| Yes| 632| 494|0.7816455696202531|
| No| 1208| 877|0.7259933774834437|
| Yes| 20192| 13007|0.6441660063391442|
| Yes| 1436| 1228|0.8551532033426184|
| Yes| 392| 351|0.8954081632653061|
| Yes| 12586| 3239|0.2573494358811378|
| Yes| 1011| 604|0.5974282888229476|
| Yes| 848| 587|0.6922169811320755|
| Yes| 8728| 5201|0.5958982584784601|
+-------+------------+--------+------------------+
这是我的代码:
privateRate = df.filter(df["Private"] == "Yes").agg(avg(col("Rate")))
print(privateRate)
#returns:
DataFrame[avg(Rate): double]
我也尝试过:
privateRate.show()
#returns a big long error
该错误看起来像这样:
Traceback (most recent call last):
File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 40, in <module>
privateRate.show()
File "c:sparkpythonlibpyspark.zippysparksqldataframe.py", line 380, in show
File "c:sparkpythonlibpy4j-0.10.7-src.zippy4jjava_gateway.py", line 1257, in __call__
File "c:sparkpythonlibpyspark.zippysparksqlutils.py", line 63, in deco
File "c:sparkpythonlibpy4j-0.10.7-src.zippy4jprotocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o110.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 6, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "c:sparkpythonlibpyspark.zippysparkworker.py", line 377, in main
File "c:sparkpythonlibpyspark.zippysparkworker.py", line 372, in process
File "c:sparkpythonlibpyspark.zippysparkserializers.py", line 393, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "c:sparkpythonlibpyspark.zippysparkutil.py", line 99, in wrapper
return f(*args, **kwargs)
File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 17, in parseLine
apps = int(fields[2])
IndexError: list index out of range
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "c:sparkpythonlibpyspark.zippysparkworker.py", line 377, in main
File "c:sparkpythonlibpyspark.zippysparkworker.py", line 372, in process
File "c:sparkpythonlibpyspark.zippysparkserializers.py", line 393, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "c:sparkpythonlibpyspark.zippysparkutil.py", line 99, in wrapper
return f(*args, **kwargs)
File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 17, in parseLine
apps = int(fields[2])
IndexError: list index out of range
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
1 more
20/02/09 14:25:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/02/09 14:25:21 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "c:sparkpythonlibpyspark.zippysparkworker.py", line 377, in main
File "c:sparkpythonlibpyspark.zippysparkworker.py", line 372, in process
File "c:sparkpythonlibpyspark.zippysparkserializers.py", line 393, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "c:sparkpythonlibpyspark.zippysparkutil.py", line 99, in wrapper
return f(*args, **kwargs)
File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 17, in parseLine
apps = int(fields[2])
IndexError: list index out of range
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/02/09 14:25:22 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
我已经看到多次出现长时间错误,我想知道它是否与我的代码无关,但更多与我的个人环境有关?
答案
Spark使用惰性评估的概念。这意味着,Spark将等到最后一刻执行计算指令图。因此,当您运行此语句时
privateRate = df.filter(df["Private"] == "Yes").agg(avg(col("Rate")))
Spark所做的全部只是建立计划以执行您的转换,但从未真正执行它。这就是为什么如果您的数据有问题,您仍然看不到错误。当您在其上调用.show()
时,就会发生陷阱。然后就是从逻辑转换级别到操作级别的事情了。动作指示Spark从您上面执行的一系列转换中计算结果。现在,查看您的错误消息,看来您的实际错误在这里:
IndexError: list index out of range
不幸的是,如果不查看整个代码并了解表的结构,就很难说出导致此错误的原因。但是希望这个答案可以帮助您将注意力集中在应该追求的目标上。
以上是关于pyspark:打印数据框时,为什么看不到实际值?我刚得到DataFrame [avg(Rate):double]的主要内容,如果未能解决你的问题,请参考以下文章
如何在 jupyter 中像 pandas Dataframe 一样打印 Pyspark Dataframe
在 Pyspark 中从 Rest Api 创建数据框时出错
Pyspark - 将rdd转换为数据框时数据设置为null