Apache Spark Codegen Stage 超过 64 KB

Posted

技术标签:

【中文标题】Apache Spark Codegen Stage 超过 64 KB【英文标题】:Apache Spark Codegen Stage grows beyond 64 KB 【发布时间】:2018-11-26 05:45:04 【问题描述】:

当我对 30 多列进行特征工程以创建大约 200 多列时,我遇到了一个错误。它没有使工作失败,但错误显示。我想知道如何避免这种情况。

Spark - 2.3.1 Python - 3.6

集群配置 - 1 主 - 32 GB RAM,16 核 4 从 - 16 GB RAM,8 核

输入数据 - 具有快速压缩的 8 个 parquet 文件分区。

我的 Spark 提交 ->

spark-submit --master spark://192.168.60.20:7077 --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5 --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf spark.driver.maxResultSize=2G --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf spark.scheduler.listenerbus.eventqueue.capacity=20000 --conf spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt

下面的堆栈跟踪 -

ERROR CodeGenerator:91 - failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426" grows beyond 64 KB
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426" grows beyond 64 KB
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
    at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
    at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
    at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1365)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.ProjectExec.doExecute(basicPhysicalOperators.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.ProjectExec.doExecute(basicPhysicalOperators.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryRelation.scala:107)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryRelation.scala:102)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:43)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:97)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:91)
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2924)
    at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.janino.InternalCompilerException: Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426" grows beyond 64 KB

【问题讨论】:

你在做什么操作? spark.conf.set("spark.sql.codegen.wholeStage", false) 试试这个属性 【参考方案1】:

问题是使用Catalyst从使用DataFrame和Dataset的程序生成的Java程序编译成Java字节码时,一个方法的字节码大小不能超过64KB,这与Java类的限制冲突文件,这是发生的异常。

隐藏错误:

spark.sql.codegen.wholeStage= "false"

解决方法:

为了避免由于上述限制而发生异常,在Spark内部,一个解决方案是在Catalyst生成Java程序时,将编译和制作可能超过64 KB的Java字节码的方法拆分为多个方法。已经完成了。

在管道中使用持久化或任何其他逻辑分离

【讨论】:

有同样的错误,persist/cache 对我有用 此配置是否适用于 Spark 2.3.2 版本? 更改此配置会影响性能,因为它会禁用一些底层优化,如 here 所述【参考方案2】:

正如 vaquar 所写,在管道中引入逻辑分离应该会有所帮助。

切断血统并在计划中引入突破的一种方法似乎是 DF -&gt; RDD -&gt; DF来回转换:

df = spark_session.sparkContext.createDataFrame(df.rdd, schema=df.schema)

在《高性能 Spark》一书中,他们进一步提到最好(更快)使用底层 Java RDD,即使用

j_rdd = df._jdf.toJavaRDD() 及其架构 j_schema = df._jdf.schema() 构造一个新的 Java DataFrame 并最终将其转换回 PySpark DataFrame:

sql_ctx = df.sql_ctx
java_sql_context = sql_ctx._jsqlContext
new_java_df = java_sql_context.createDataFrame(j_rdd, j_schema)
new_df = DataFrame(new_java_df, sql_ctx)

【讨论】:

【参考方案3】:

如果您使用的是 pyspark 2.3+,请尝试

spark = SparkSession.builder.master('local').appName('tow-way')\
        .config('spark.sql.codegen.wholeStage', 'false')\ ## <-- add this line
        .getOrCreate()

【讨论】:

【参考方案4】:

我们通过在代码中添加额外的“检查点”解决了这个错误。

检查点 = 在我们的案例 s3 中,您需要将数据帧(数据)写回磁盘,然后在新的数据帧中将其读回,这会导致清空 JVM spark 容器并使用新代码重新启动

检查点详情

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

【讨论】:

以上是关于Apache Spark Codegen Stage 超过 64 KB的主要内容,如果未能解决你的问题,请参考以下文章

解决spark中遇到的数据倾斜问题

如何在并行火花中运行转换

Fmt 0.9.0 破坏了我的代码,我怎样才能使它兼容? Format.pp_set_formatter_stag_functions

从 graphql-codegen 获取类似于 apollo codegen 生成的类型

类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD

Apache-Spark 作为日志存储