pyspark:打印数据框时,为什么看不到实际值?我刚得到DataFrame [avg(Rate):double]

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark:打印数据框时,为什么看不到实际值?我刚得到DataFrame [avg(Rate):double]相关的知识,希望对你有一定的参考价值。

我正在尝试返回一列的平均值。当我打印结果时,看不到实际值。

我的数据框(在以下代码之前):

+-------+------------+--------+------------------+
|Private|Applications|Accepted|              Rate|
+-------+------------+--------+------------------+
|    Yes|         417|     349|0.8369304556354916|
|    Yes|        1899|    1720|0.9057398630858347|
|    Yes|        1732|    1425|0.8227482678983834|
|    Yes|         494|     313|0.6336032388663968|
|     No|        3540|    2001|0.5652542372881356|
|     No|        7313|    4664|0.6377683577191303|
|    Yes|         619|     516|0.8336025848142165|
|    Yes|         662|     513|0.7749244712990937|
|    Yes|         761|     725|0.9526938239159002|
|    Yes|        1690|    1366| 0.808284023668639|
|    Yes|        6075|    5349|0.8804938271604938|
|    Yes|         632|     494|0.7816455696202531|
|     No|        1208|     877|0.7259933774834437|
|    Yes|       20192|   13007|0.6441660063391442|
|    Yes|        1436|    1228|0.8551532033426184|
|    Yes|         392|     351|0.8954081632653061|
|    Yes|       12586|    3239|0.2573494358811378|
|    Yes|        1011|     604|0.5974282888229476|
|    Yes|         848|     587|0.6922169811320755|
|    Yes|        8728|    5201|0.5958982584784601|
+-------+------------+--------+------------------+

这是我的代码:

privateRate = df.filter(df["Private"] == "Yes").agg(avg(col("Rate")))

print(privateRate)

#returns:
DataFrame[avg(Rate): double]

我也尝试过:

privateRate.show()

#returns a big long error

该错误看起来像这样:

Traceback (most recent call last):
  File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 40, in <module>
    privateRate.show()
  File "c:sparkpythonlibpyspark.zippysparksqldataframe.py", line 380, in show
  File "c:sparkpythonlibpy4j-0.10.7-src.zippy4jjava_gateway.py", line 1257, in __call__
  File "c:sparkpythonlibpyspark.zippysparksqlutils.py", line 63, in deco
  File "c:sparkpythonlibpy4j-0.10.7-src.zippy4jprotocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o110.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 6, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "c:sparkpythonlibpyspark.zippysparkworker.py", line 377, in main
  File "c:sparkpythonlibpyspark.zippysparkworker.py", line 372, in process
  File "c:sparkpythonlibpyspark.zippysparkserializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "c:sparkpythonlibpyspark.zippysparkutil.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 17, in parseLine
    apps = int(fields[2])
IndexError: list index out of range


        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)

        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)

        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)

        at org.apache.spark.scheduler.Task.run(Task.scala:123)

        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)

        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)


Driver stacktrace:

        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)

        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)

        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)

        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)

        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)

        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)

        at scala.Option.foreach(Option.scala:257)

        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)

        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)

        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)

        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)

        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)

        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)

        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)

        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)

        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)

        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)

        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)

        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)

        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)

        at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)

        at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)

        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)

        at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "c:sparkpythonlibpyspark.zippysparkworker.py", line 377, in main
  File "c:sparkpythonlibpyspark.zippysparkworker.py", line 372, in process
  File "c:sparkpythonlibpyspark.zippysparkserializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "c:sparkpythonlibpyspark.zippysparkutil.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 17, in parseLine
    apps = int(fields[2])
IndexError: list index out of range


        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)

        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)

        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)

        at org.apache.spark.scheduler.Task.run(Task.scala:123)

        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)

        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

1 more


20/02/09 14:25:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/02/09 14:25:21 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "c:sparkpythonlibpyspark.zippysparkworker.py", line 377, in main
  File "c:sparkpythonlibpyspark.zippysparkworker.py", line 372, in process
  File "c:sparkpythonlibpyspark.zippysparkserializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "c:sparkpythonlibpyspark.zippysparkutil.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "C:/Users/jacob/OneDrive/Documents/Machine Learning 2/M5/M5IndividualPt1.py", line 17, in parseLine
    apps = int(fields[2])
IndexError: list index out of range

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
20/02/09 14:25:22 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job

我已经看到多次出现长时间错误,我想知道它是否与我的代码无关,但更多与我的个人环境有关?

答案

Spark使用惰性评估的概念。这意味着,Spark将等到最后一刻执行计算指令图。因此,当您运行此语句时

privateRate = df.filter(df["Private"] == "Yes").agg(avg(col("Rate")))

Spark所做的全部只是建立计划以执行您的转换,但从未真正执行它。这就是为什么如果您的数据有问题,您仍然看不到错误。当您在其上调用.show()时,就会发生陷阱。然后就是从逻辑转换级别到操作级别的事情了。动作指示Spark从您上面执行的一系列转换中计算结果。现在,查看您的错误消息,看来您的实际错误在这里:

IndexError: list index out of range

不幸的是,如果不查看整个代码并了解表的结构,就很难说出导致此错误的原因。但是希望这个答案可以帮助您将注意力集中在应该追求的目标上。

以上是关于pyspark:打印数据框时,为什么看不到实际值?我刚得到DataFrame [avg(Rate):double]的主要内容,如果未能解决你的问题,请参考以下文章

如何在 jupyter 中像 pandas Dataframe 一样打印 Pyspark Dataframe

在 Pyspark 中从 Rest Api 创建数据框时出错

Pyspark - 将rdd转换为数据框时数据设置为null

使用架构详细信息创建数据框时 Dataproc 上的 Pyspark 错误

在 PySpark 中连接两个数据框时避免列重复列名

Pyspark在使用大量列保存数据框时遇到问题