Apache spark内部连接2个数据框得到TreeNodeException

Posted

技术标签:

【中文标题】Apache spark内部连接2个数据框得到TreeNodeException【英文标题】:Apache spark inner join 2 dataframes getting TreeNodeException 【发布时间】:2019-06-17 15:42:31 【问题描述】:

我对 Apache spark 非常陌生,我正在尝试将 Book 表与具有 book_id 和 read_count 的表进行内部连接。目标是生成一个书名表及其对应的阅读次数。

首先,我有一个表booksRead,其中包含用户阅读书籍的记录,按book_id 分组,按照他们的阅读频率排序。

booksReadDF.groupBy($"book_id").agg(count("book_id").as("read_count"))
      .orderBy($"read_count".desc)
      .show()
+-------+----------+
|book_id|read_count|
+-------+----------+
|   8611|       565|
|     14|       436|
|  11850|       394|
|     15|       357|
|  11803|       324|
+-------+----------+
only showing top 5 rows

我正在尝试内部加入表 books,如下所示:

+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
|    id|          created_at|          updated_at|               title|      isbn_13|   isbn_10|           image_url|         description|           publisher|author_id|year|overall_rating|audible_link|          google_url|         query_title|category|language|number_of_reviews|waterstone_link|amazon_available|is_ebook|page_count|
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
|115442|2018-07-25 00:59:...|2018-07-25 00:59:...|Representation of...|9781361479278|1361479272|http://books.goog...|This dissertation...|Open Dissertation...|    62130|2017|          null|        null|http://books.goog...|representation of...|        |      en|                0|           null|            true|   false|      null|
|115450|2018-07-25 00:59:...|2018-07-25 00:59:...|Imag(in)ing the W...|9789004182981|9004182985|http://books.goog...|This study examin...|               BRILL|    73131|2010|          null|        null|http://books.goog...|imagining the war...|        |      en|                0|           null|            true|   false|      null|
|218332|2018-08-19 14:48:...|2018-08-19 14:48:...|My Life With Tibe...|9781462802357|1462802354|http://books.goog...|Your child is a m...| Xlibris Corporation|   118091|2008|          null|        null|https://play.goog...|my life with tibe...|        |      en|                0|           null|            true|   false|      null|
|186991|2018-08-11 11:08:...|2018-08-11 11:08:...|  NOT "Just Friends"|9781416586401|1416586407|http://books.goog...|One of the world’...|  Simon and Schuster|     7687|2007|          null|        null|https://play.goog...|    not just friends|        |      en|                0|           null|            true|   false|      null|
|247317|2018-09-06 08:23:...|2018-09-06 08:23:...|OCR AS and A Leve...|9781910523056|1910523054|https://images-eu...|A complete course...|   PG Online Limited|   128220|2016|          null|        null|                null|ocr as and a leve...|    null| English|             null|           null|            true|   false|      null|
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
only showing top 5 rows

其中book_idid 使用此命令在书桌上连接:

booksReadDF.groupBy($"book_id").agg(count("book_id").as("read_count"))
      .orderBy($"read_count".desc)
      .join(booksDF, booksReadDF.col("book_id") === booksDF.col("id"), "inner")
      .show()

但我收到此错误:

Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(book_id#4, 200)
+- *(3) Sort [read_count#67L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(read_count#67L DESC NULLS LAST, 200)
      +- *(2) HashAggregate(keys=[book_id#4], functions=[count(book_id#4)], output=[book_id#4, read_count#67L])
         +- Exchange hashpartitioning(book_id#4, 200)
            +- *(1) HashAggregate(keys=[book_id#4], functions=[partial_count(book_id#4)], output=[book_id#4, count#215L])
               +- *(1) Scan JDBCRelation(books_readbook) [numPartitions=1] [book_id#4] PushedFilters: [*IsNotNull(book_id)], ReadSchema: struct<book_id:int>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:383)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
    at DBConn$.main(DBConn.scala:36)
    at DBConn.main(DBConn.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(read_count#67L DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[book_id#4], functions=[count(book_id#4)], output=[book_id#4, read_count#67L])
   +- Exchange hashpartitioning(book_id#4, 200)
      +- *(1) HashAggregate(keys=[book_id#4], functions=[partial_count(book_id#4)], output=[book_id#4, count#215L])
         +- *(1) Scan JDBCRelation(books_readbook) [numPartitions=1] [book_id#4] PushedFilters: [*IsNotNull(book_id)], ReadSchema: struct<book_id:int>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 52 more
Caused by: java.lang.IllegalArgumentException: Unsupported class file major version 56
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
    at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
    at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
    at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
    at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:309)
    at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:224)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 72 more
19/06/17 16:21:47 INFO SparkContext: Invoking stop() from shutdown hook
19/06/17 16:21:47 INFO SparkUI: Stopped Spark web UI at http://10.245.65.12:4040
19/06/17 16:21:47 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/06/17 16:21:47 INFO MemoryStore: MemoryStore cleared
19/06/17 16:21:47 INFO BlockManager: BlockManager stopped
19/06/17 16:21:48 INFO BlockManagerMaster: BlockManagerMaster stopped
19/06/17 16:21:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/06/17 16:21:48 INFO SparkContext: Successfully stopped SparkContext
19/06/17 16:21:48 INFO ShutdownHookManager: Shutdown hook called
19/06/17 16:21:48 INFO ShutdownHookManager: Deleting directory /private/var/folders/ql/dpk0v2gs15z83pvwt_g3n7lh0000gn/T/spark-9368b8cb-0cf6-45a5-9548-a9c1975dab46

【问题讨论】:

【参考方案1】:

你的核心例外是

java.lang.IllegalArgumentException: Unsupported class file major version 56

这表明在某些时候您正在尝试运行为与运行时不同的 Java 版本编译的字节码。确保您在 Java 8 JRE 上运行 Spark,并确保所有依赖项(例如您的 Postgres JDBC 驱动程序)也是为 Java 8 构建的。

【讨论】:

你说的很对!我安装了 JDK 12,显然我的依赖项不兼容。干杯!

以上是关于Apache spark内部连接2个数据框得到TreeNodeException的主要内容,如果未能解决你的问题,请参考以下文章

高级连接两个数据框 spark scala

为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]

Spark SCALA - 连接两个数据帧,其中一个数据帧中的连接值位于第二个数据帧中的两个字段之间

在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构

Apache Spark join 操作扩展性差

Apache Spark:广播连接不适用于缓存的数据帧