将 pyspark df 转换为 pandas 时抛出的异常是等待结果
Posted
技术标签:
【中文标题】将 pyspark df 转换为 pandas 时抛出的异常是等待结果【英文标题】:Exception thrown is await result while converting pyspark df to pandas 【发布时间】:2021-04-09 01:33:35 【问题描述】:我正在尝试使用 UDF 进行一些计算。但是在计算之后,当我尝试将 pyspark 数据帧转换为熊猫时,它给了我
org.apache.spark.SparkException: Exception thrown in awaitResult:
我会记下可重现的代码。
import pandas as pd
import numpy as np
import time
n = 10000
sample_df = pd.DataFrame(np.random.rand(n,n))
sample_df.columns = sample_df.columns.astype(str)
sample_df.index = sample_df.index.astype(str)
sample_df.loc['start'] = np.random.rand(n)
sample_df.loc['null'] = np.random.rand(n)
sample_df.loc['conv'] = np.random.rand(n)
sample_df["start"] = 0.0
sample_df["null"] = np.random.rand(sample_df.shape[0])
sample_df["conv"] = np.random.rand(sample_df.shape[0])
sample_df.index.name = 'from'
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
channels = [channel for channel in sample_df.columns if channel not in ['start', 'null', 'conv']]
channels_df = spark.createDataFrame(channels, StringType()).toDF(*['channel'])
from pyspark.sql.functions import udf
@udf("float")
def removal_effects_udf(channel):
global sample_df
conversion_rate=0.0313
removal_df = sample_df.drop(channel, axis=1).drop(channel, axis=0)
row_sum = pd.DataFrame(float(1) - removal_df.sum(axis=1), columns = ["value"])
null_pct = row_sum[row_sum['value']!=0].reset_index()
null_pct.set_index('from', inplace=True)
removal_df['null'] = removal_df.index.to_series().map(null_pct['value']).fillna(removal_df['null'])
removal_df.loc['null']['null'] = 1.0
removal_to_conv = removal_df[['null', 'conv']].drop(['null', 'conv'], axis=0)
removal_to_non_conv = removal_df.drop(['null', 'conv'], axis=1).drop(['null', 'conv'], axis=0)
removal_inv_diff = np.linalg.inv(np.identity(len(removal_to_non_conv.columns)) - np.asarray(removal_to_non_conv))
removal_dot_prod = np.dot(removal_inv_diff, np.asarray(removal_to_conv))
removal_cvr = pd.DataFrame(removal_dot_prod, index=removal_to_conv.index)[[1]].loc['start'].values[0]
removal_effect = 1 - removal_cvr / conversion_rate
return float(removal_effect)
channels_df = channels_df.withColumn("removal_effect", removal_effects_udf(F.col("channel"))).toPandas()
channels_df_pandas = channels_df.toPandas()
完成此操作后,我收到此错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-1959014423699154> in <module>
----> 1 channels_df = channels_df.withColumn("removal_effect", removal_effects_udf(F.col("channel"))).toPandas()
/databricks/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
106 # Rename columns to avoid duplicated column names.
107 tmp_column_names = ['col_'.format(i) for i in range(len(self.columns))]
--> 108 batches = self.toDF(*tmp_column_names)._collect_as_arrow()
109 if len(batches) > 0:
110 table = pyarrow.Table.from_batches(batches)
/databricks/spark/python/pyspark/sql/pandas/conversion.py in _collect_as_arrow(self)
244 finally:
245 # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 246 jsocket_auth_server.getResult()
247
248 # Separate RecordBatches from batch order indices in results
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o32146.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 160 in stage 1221.0 failed 4 times, most recent failure: Lost task 160.3 in stage 1221.0 (TID 161215, 10.0.1.18, executor 582): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:618)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:607)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:71)
... 38 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:115)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:108)
at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$1(SocketAuthServer.scala:62)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:62)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:618)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:607)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:71)
... 38 more
我该如何解决这个问题?
我正在使用数据块,集群有 488 个内核 | 1.75TB | Spark 3.0.0
编辑:
对于用户@wwnde 提供的解决方案,我仍然收到错误消息:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-1959014423698939> in <module>
5
6 spark.conf.set("spark.sql.execution.arrow.enabled", "true")
----> 7 channels_df_pandas = channels_df.select("*").toPandas()
/databricks/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
106 # Rename columns to avoid duplicated column names.
107 tmp_column_names = ['col_'.format(i) for i in range(len(self.columns))]
--> 108 batches = self.toDF(*tmp_column_names)._collect_as_arrow()
109 if len(batches) > 0:
110 table = pyarrow.Table.from_batches(batches)
/databricks/spark/python/pyspark/sql/pandas/conversion.py in _collect_as_arrow(self)
244 finally:
245 # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 246 jsocket_auth_server.getResult()
247
248 # Separate RecordBatches from batch order indices in results
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o32697.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 1223.0 failed 4 times, most recent failure: Lost task 7.3 in stage 1223.0 (TID 161666, 10.0.1.26, executor 597): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 644, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/databricks/spark/python/pyspark/worker.py", line 463, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/databricks/spark/python/pyspark/worker.py", line 254, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/databricks/spark/python/pyspark/worker.py", line 76, in read_command
command = serializer.loads(command.value)
File "/databricks/spark/python/pyspark/broadcast.py", line 154, in value
self._value = self.load_from_path(self._path)
File "/databricks/spark/python/pyspark/broadcast.py", line 131, in load_from_path
return self.load(f)
File "/databricks/spark/python/pyspark/broadcast.py", line 137, in load
return pickle.load(file)
OSError: [Errno 12] Cannot allocate memory
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:585)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141)
at java.lang.Thread.run(Thread.java:748)
【问题讨论】:
【参考方案1】:import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
channels_df_pandas = channels_df.select("*").toPandas()
【讨论】:
它给出了与OSError: [Errno 12] Cannot allocate memory
@wwnde类似的错误
这个东西也出现在错误的某个地方@wwnde Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 187 in stage 1224.0 failed 4 times, most recent failure: Lost task 187.3 in stage 1224.0 (TID 162043, 10.0.1.16, executor 608): ExecutorLostFailure (executor 608 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
最好我建议您使用 DataBricks 社区版。您将访问 spark 并且是可扩展的。
我正在使用具有 488 个核心的集群的数据块 | 1.75TB | Spark 3.0.0 @wwnde
不确定@Vishal【参考方案2】:
您的代码的主要问题是您正在使用toPandas
函数,该函数有效地将所有数据带到驱动程序节点 - 集群中的内存和内核总量在这里无关紧要 - 驱动程序节点大小是主要瓶颈(当然你可以增加驱动节点的大小)。我还看到您指的是 UDF 中的全局变量 - 理论上它应该被广播,但这仍然是不好的做法。
要真正解决问题,您需要重新设计方法以使代码完全分布式:
摆脱toPandas
- 最好将结果写入某处,然后以其他方式访问它们 - 看起来您的数据太多了。
不喜欢使用全局变量
另外,建议使用更快的Pandas UDFs 而不是“普通 UDF”。
【讨论】:
以上是关于将 pyspark df 转换为 pandas 时抛出的异常是等待结果的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 .str 和 .split 将 pandas 代码转换为 Pyspark
将 pandas 数据框转换为 PySpark RDD 时出现问题?
在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF