Spark 1.6:java.lang.IllegalArgumentException:spark.sql.execution.id 已设置
Posted
技术标签:
【中文标题】Spark 1.6:java.lang.IllegalArgumentException:spark.sql.execution.id 已设置【英文标题】:Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id is already set 【发布时间】:2016-01-11 00:46:19 【问题描述】:我使用的是 spark 1.6,运行以下代码时遇到了上述问题:
// Imports
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.SaveMode
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.Properties
import scala.concurrent.Future
// Set up spark on local with 2 threads
val conf = new SparkConf().setMaster("local[2]").setAppName("app")
val sc = new SparkContext(conf)
val sqlCtx = new HiveContext(sc)
// Create fake dataframe
import sqlCtx.implicits._
var df = sc.parallelize(1 to 50000).map i => (i, i, i, i, i, i, i) .toDF("a", "b", "c", "d", "e", "f", "g").repartition(2)
// Write it as a parquet file
df.write.parquet("/tmp/parquet1")
df = sqlCtx.read.parquet("/tmp/parquet1")
// JDBC connection
val url = s"jdbc:postgresql://localhost:5432/tempdb"
val prop = new Properties()
prop.setProperty("user", "admin")
prop.setProperty("password", "")
// 4 futures - at least one of them has been consistently failing for
val x1 = Future df.write.jdbc(url, "temp1", prop)
val x2 = Future df.write.jdbc(url, "temp2", prop)
val x3 = Future df.write.jdbc(url, "temp3", prop)
val x4 = Future df.write.jdbc(url, "temp4", prop)
这里是 github 要点:https://gist.github.com/karanveerm/27d852bf311e39f05491
我得到的错误是: 在
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na]
at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na]
at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6]
at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6]
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na]
这是火花错误还是我做错了什么/任何解决方法?
【问题讨论】:
请问您在哪台机器上运行此代码?我对 CPU(多少个内核)特别感兴趣? OSX El Capitan 10.11.1 | MacBook Air(13 英寸,2014 年初)| 1.7 GHz 英特尔酷睿 i7 | 8 GB 1600 MHz DDR3(我相信 i7 是 4 核) 有趣,我无法在类似的设置(来自 spark shell)上重现这个。这可能是一些讨厌的错误,他们之前在生成 ID 时遇到了问题。您可能想为此创建一个 JIRA。 你运行的是哪个版本的 postgres? 我在 9.3.5 上运行 【参考方案1】:在尝试了几件事后,我发现由全局ForkJoinPool
创建的线程之一将其spark.sql.execution.id
属性设置为随机值。
我无法确定实际执行此操作的过程,但我可以使用我自己的 ExecutionContext
来解决它。
import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
implicit val ec = ExecutionContext.fromExecutorService(executorService)
我使用了来自http://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.html 的代码。
也许ForkJoinPool
在创建新线程属性时会克隆线程属性,如果这发生在 SQL 执行的上下文中,它将获得其非空值,而FixedThreadPool
将在实例化时创建线程。
【讨论】:
我遇到了同样的问题。但是这个解决方案似乎没有帮助。我仍然看到spark.sql.execution.id already set
错误。
@smas 问题不在于线程数,而在于这些线程的初始化。分叉连接池将随机初始化线程并初始化新线程,它会克隆所有属性。因此,如果在初始化新线程时,现有线程设置了 SQL 执行 ID,它会将其复制到新线程,而不是生成新线程。【参考方案2】:
请检查SPARK-13747
如果适用于您的环境,请考虑使用 Spark 2.2.0 或更高版本。
【讨论】:
【参考方案3】:测试 1:如果您以串行方式而不是并行方式运行每个 df.write 操作是否有帮助?
测试 2:如果您持久化数据帧,然后并行执行所有 df.write 操作并在所有操作完成后进行序列化以取消持久化以查看这是否有帮助,是否有帮助?
【讨论】:
以上是关于Spark 1.6:java.lang.IllegalArgumentException:spark.sql.execution.id 已设置的主要内容,如果未能解决你的问题,请参考以下文章
Xposed出现 java.lang.IllegalAccessError: Class ref in pre-verified class resolved to unexpected imp(示例
Spark(1.6) Densevector.type 不带参数