为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

Posted

技术标签:

【中文标题】为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?【英文标题】:Why these Py4JJavaError showString errors while joining Spark dataframes using pyspark?为什么在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误? 【发布时间】:2017-10-31 02:21:34 【问题描述】:

我正在尝试使用 pyspark 在 Spark 中加入数据帧。这两个数据帧非常大(其中一个超过 5GB),我不断收到以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-37-d940918c3fe6> in <module>()
      1 train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.drop('type', 'locale', 'locale_name', 'description', 'transferred')
----> 2 train_holiday_oil_store_transaction_item_test_004.show()

/usr/local/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    334         """
    335         if isinstance(truncate, bool) and truncate:
--> 336             print(self._jdf.showString(n, 20))
    337         else:
    338             print(self._jdf.showString(n, int(truncate)))

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling 012.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o873.showString.
: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
    at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:126)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
    at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36)
    at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
    at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:235)
    at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:263)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:235)
    at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:46)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:36)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
    at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:46)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
    at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:36)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
    at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

这是我的代码:

train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.join(stores_df, 'store_nbr', 'left_outer')
train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.drop('city', 'state', 'store_type', 'cluster')
train_holiday_oil_store_transaction_item_test_004.show()

发生了什么事?什么是解决方案?

我将分区增加到 500,所以这不会是问题。

我还想知道在使用 pyspark 时加入大型数据框的典型方法是什么?有人有这种经历吗?

【问题讨论】:

请阅读minimal reproducible example并采取行动。还有How to Ask--谷歌搜索给了你什么? 如果您阅读How to Ask,它会告诉您总结您的研究,请这样做。 (所以“当然”是无关紧要的。事实是,如果他们用谷歌搜索,大多数不清楚/非 MCVE/糟糕的问题都做得很差。)请在您的问题中编辑澄清,而不是 cmetss。仍然没有minimal reproducible example,请照它说的做。特别是-完整-代码、输入、输出和预期输出。在这里,较小的测试输入会发生什么?另请在不同的问题帖子中提出您的最终问题。 【参考方案1】:

如果您查看错误消息,您会看到 spark 正在调用 BroadcastHashJoin。由于数据帧很大,发送它会导致超时。 这个问题的解决方案很少

    增加spark.sql.broadcastTimeout 通过设置spark.sql.autoBroadcastJoinThreshold = -1 强制 spark 使用 ShuffleHashJoin

    在两个数据帧上使用相同的分区器。例如,如果您有两个数据框,并且希望根据 id 列加入它们。您应该按 id 列重新分区它们

    df1 = df1.repartiton("id")df2 = df2.repartition("id")

【讨论】:

以上是关于为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?的主要内容,如果未能解决你的问题,请参考以下文章

为啥即使使用限制命令访问结果,SPARK \PYSPARK 也会计算所有内容?

当 python 函数比它们快时,为啥我们使用 pyspark UDF? (注。不用担心 spark SQL 命令)

为啥此 python 代码在 pyspark 中有效,但在 spark-submit 中无效?

为啥这个 PySpark 加入失败?

在PySpark 2上加入DataFrame时出错

Pyspark 自加入创建网络数据