为啥这个 PySpark 加入失败?

Posted

技术标签:

【中文标题】为啥这个 PySpark 加入失败?【英文标题】:Why does this PySpark join fail?为什么这个 PySpark 加入失败? 【发布时间】:2018-01-15 20:01:05 【问题描述】:

我误解了以下示例中 PySpark 的性能。

我有几个 DataFrame,因此我加入了它们。

print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()

data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()

data4.write.parquet(output + '/test.parquet', mode="overwrite")

我希望leftouter 加入将返回左侧 DataFrame 以及来自右侧 DataFrame 的匹配项(如果有)。

Soma 样本输出:

users_data
+--------------+----------+-------------------------+
|   category_pk|   item_pk|             unique_users|
+--------------+----------+-------------------------+
|           321|       460|                        1|
|           730|       740|                        2|
|           140|       720|                       10|


users_cat_data
+--------------+-----------------------+
|   category_pk|   unique_users_per_cat|
+--------------+-----------------------+
|           111|                    258|
|           100|                    260|
|           750|                      9|

但是,我观察到了不同的行为。我使用show() 打印出我在加入操作中使用的所有DataFrame 的前5 行。所有 DataFrame 都包含数据。但我收到以下错误:

None
DATA1
Traceback (most recent call last):
  File "mytest.py", line 884, in <module>
    args.field1, args.field2, args.field3)
  File "mytest.py", line 802, in calc
    print data1.show()
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 336, in show
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o802.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
        at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 

Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)

我不明白为什么在print data1.show() 行出现任务序列化错误。用于创建ata1 的DataFrame 不为空。另外,show()在这行代码上面2行成功使用。

有时它会在最后一行 data4.write.parquet(output + '/test.parquet', mode="overwrite") 失败,当我删除它时,它运行良好。但现在它甚至更早地在data1.show() 行上失败了。

如何解决这个问题。任何帮助将不胜感激。

【问题讨论】:

根据您的堆栈跟踪 - users_cat_data 实际上是 None 吗?前 2 个 show() 打印成什么? @ayplam:不,users_cat_data 不为空。 DATA1 之前没有空的。另外,我不知道None 指的是什么。当我使用show() 时,它无处不在,但随后show() 打印出内容。所以,我完全错过了。 @ayplam:我为users_datausers_cat_data 添加了一些示例输出。 DataFrame calc 包含更多列,但也可以毫无问题地打印出来。 【参考方案1】:

认为最上面的org.apache.spark.SparkException: Exception thrown in awaitResult 的原因是,在请求BroadcastExchangeExec 物理运算符广播关系(又名)时,它只是超时了(在默认的 5 分钟等待完成后)。

这就是异常含义的底层背景。

现在,您可能会问自己,为什么会发生这种情况?

spark.sql.broadcastTimeout 设置为-1 以完全禁用超时(这将导致线程无限期地等待广播完成)或将其增加到10 分钟左右。

您还可以通过将spark.sql.autoBroadcastJoinThreshold 设置为-1 来禁用广播表格。

但是,这只会解决您的环境中发生的更严重的问题。

我的猜测是您的 YARN 集群(由 /mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001 猜测)资源紧张,网络也可能很慢。

总而言之,我的猜测是您查询中的某些表低于默认的 10MB,这导致 Spark SQL 优化器选择广播(而不是通过执行器分发数据集的其他方式)。 p>

认为集群中发生了一些更严重的事情,您面临一些临时问题,直到......管理员修复了 YARN 集群。当您提交 PySpark 应用程序时,集群是否会承受更多负载?

我不明白为什么会出现任务序列化错误

认为您可以简单地将其视为早期问题的副作用,因为 PySpark 如何通过套接字通信的两个进程(即 Python 和 JVM)在幕后工作。

【讨论】:

非常感谢。我正在阅读你的答案。到目前为止,我可以告诉你,我总是在spark-submit 中使用以下配置:--conf spark.sql.broadcastTimeout=1000 1000 秒?那是15分钟左右,不是吗?我怀疑驱动程序机器和执行程序之间的网络很慢(呃)。检查 ping 等。 当集群负载更大并且我有几个并行的 Spark 作业在不同的队列中运行时,确实会发生此错误。非常感谢您的帮助。

以上是关于为啥这个 PySpark 加入失败?的主要内容,如果未能解决你的问题,请参考以下文章

为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

当 RDD 包含用户定义的类时,为啥 Apache PySpark top() 会失败?

Pyspark,执行者在执行连接时失去连接

为啥在 pyspark 中加入两个临时视图后删除列不起作用,但它适用于数据框连接?

为啥 df.limit 在 Pyspark 中不断变化?

Pyspark:与使用 pandas 创建数据帧相比,为啥使用 pyspark 创建数据帧需要更多时间 [重复]