为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?
Posted
技术标签:
【中文标题】为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?【英文标题】:Why these Py4JJavaError showString errors while joining Spark dataframes using pyspark?为什么在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误? 【发布时间】:2017-10-31 02:21:34 【问题描述】:我正在尝试使用 pyspark 在 Spark 中加入数据帧。这两个数据帧非常大(其中一个超过 5GB),我不断收到以下错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-37-d940918c3fe6> in <module>()
1 train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.drop('type', 'locale', 'locale_name', 'description', 'transferred')
----> 2 train_holiday_oil_store_transaction_item_test_004.show()
/usr/local/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py in show(self, n, truncate)
334 """
335 if isinstance(truncate, bool) and truncate:
--> 336 print(self._jdf.showString(n, 20))
337 else:
338 print(self._jdf.showString(n, int(truncate)))
/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling 012.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o873.showString.
: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:126)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:235)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:263)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:235)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:46)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:36)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:46)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:36)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
这是我的代码:
train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.join(stores_df, 'store_nbr', 'left_outer')
train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.drop('city', 'state', 'store_type', 'cluster')
train_holiday_oil_store_transaction_item_test_004.show()
发生了什么事?什么是解决方案?
我将分区增加到 500,所以这不会是问题。
我还想知道在使用 pyspark 时加入大型数据框的典型方法是什么?有人有这种经历吗?
【问题讨论】:
请阅读minimal reproducible example并采取行动。还有How to Ask--谷歌搜索给了你什么? 如果您阅读How to Ask,它会告诉您总结您的研究,请这样做。 (所以“当然”是无关紧要的。事实是,如果他们用谷歌搜索,大多数不清楚/非 MCVE/糟糕的问题都做得很差。)请在您的问题中编辑澄清,而不是 cmetss。仍然没有minimal reproducible example,请照它说的做。特别是-完整-代码、输入、输出和预期输出。在这里,较小的测试输入会发生什么?另请在不同的问题帖子中提出您的最终问题。 【参考方案1】:如果您查看错误消息,您会看到 spark 正在调用 BroadcastHashJoin。由于数据帧很大,发送它会导致超时。 这个问题的解决方案很少
-
增加
spark.sql.broadcastTimeout
通过设置spark.sql.autoBroadcastJoinThreshold = -1
强制 spark 使用 ShuffleHashJoin
在两个数据帧上使用相同的分区器。例如,如果您有两个数据框,并且希望根据 id 列加入它们。您应该按 id 列重新分区它们
df1 = df1.repartiton("id")
df2 = df2.repartition("id")
【讨论】:
以上是关于为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?的主要内容,如果未能解决你的问题,请参考以下文章
为啥即使使用限制命令访问结果,SPARK \PYSPARK 也会计算所有内容?
当 python 函数比它们快时,为啥我们使用 pyspark UDF? (注。不用担心 spark SQL 命令)