Spark 2.1 在读取大量数据集时挂起

Posted

技术标签:

【中文标题】Spark 2.1 在读取大量数据集时挂起【英文标题】:Spark 2.1 Hangs while reading a huge datasets 【发布时间】:2017-07-11 06:39:02 【问题描述】:

在我的应用程序中,我比较了两个不同的数据集(即来自 Hive 的源表和来自 RDBMS 的目标)的重复和不匹配情况,它适用于较小的数据集,但是当我尝试比较超过 1GB 的数据(仅源)时,它挂起并抛出TIMEOUT ERROR,我尝试了.config("spark.network.timeout", "600s"),即使在增加网络超时后它抛出java.lang.OutOfMemoryError: GC overhead limit exceeded

 val spark = SparkSession.builder().master("local")
  .appName("spark remote")
  .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
  .config("javax.jdo.option.ConnectionUserName", "hiveroot")
  .config("javax.jdo.option.ConnectionPassword", "hivepassword")
  .config("hive.exec.scratchdir", "/tmp/hive/$user.name")
  .config("hive.metastore.uris", "thrift://192.168.175.160:9083")    
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

 val source = spark.sql("SELECT * from sample.source").rdd.map(_.mkString(","))

 SparkSession.clearActiveSession()
 SparkSession.clearDefaultSession()

val sparkdestination = SparkSession.builder().master("local").appName("Database")
  .config("spark.network.timeout", "600s")
  .getOrCreate()

val jdbcUsername = "root"
val jdbcPassword = "root"
val url = "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false"

val connectionProperties = new java.util.Properties()
connectionProperties.put("user", jdbcUsername)
connectionProperties.put("password", jdbcPassword)

val queryDestination = "(select * from destination) as dest"
val destination = sparkdestination.read.jdbc(url, queryDestination, connectionProperties).rdd.map(_.mkString(","))

我也尝试了destination.persist(StorageLevel.MEMORY_AND_DISK_SER) (MEMORY_AND_DISK,DISK_ONLY) 方法,但没有成功。

编辑:这是原始错误堆栈:

17/07/11 12:49:43 INFO DAGScheduler: Submitting 22 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at map at stack.scala:76)
17/07/11 12:49:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 22 tasks
17/07/11 12:49:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/07/11 12:51:38 INFO JDBCRDD: closed connection
17/07/11 12:51:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
17/07/11 12:51:38 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)

17/07/11 12:49:43 INFO DAGScheduler: Submitting 22 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at map at stack.scala:76)
17/07/11 12:49:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 22 tasks
17/07/11 12:49:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/07/11 12:51:38 INFO JDBCRDD: closed connection
17/07/11 12:51:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
17/07/11 12:51:38 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)

编辑 2:

我尝试使用:

 val options = Map(
  "url" -> "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false",
  "dbtable" -> queryDestination,
  "user" -> "root",
  "password" -> "root")

val destination = sparkdestination.read.options(options).jdbc(options("url"), options("dbtable"), "0", 1, 5, 4, new java.util.Properties()).rdd.map(_.mkString(","))

我用少量数据检查了它的工作原理,但对于大型数据集它会抛出下面的错误

ERROR

    17/07/11 14:12:46 INFO DAGScheduler: looking for newly runnable stages
17/07/11 14:12:46 INFO DAGScheduler: running: Set(ShuffleMapStage 1)
17/07/11 14:12:46 INFO DAGScheduler: waiting: Set(ResultStage 2)
17/07/11 14:12:46 INFO DAGScheduler: failed: Set()
17/07/11 14:12:50 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.175.160:39913 in memory (size: 19.9 KB, free: 353.4 MB)
17/07/11 14:14:47 WARN ServerConnector: 
17/07/11 14:15:32 WARN QueuedThreadPool: 
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.String.substring(String.java:1969)

17/07/11 14:15:32 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:179)

17/07/11 14:15:32 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,      [Lscala.Tuple2;@1e855db,BlockManagerId  (driver, 192.168.175.160, 39913, None))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by   spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

17/07/11 14:15:32 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner

java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:179)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)

17/07/11 14:15:32 WARN QueuedThreadPool: Unexpected thread death: org.spark_project.jetty.util.thread.QueuedThreadPool$3@710104 in  SparkUISTARTED,8<=8<=200,i=5,q=0
17/07/11 14:15:32 INFO JDBCRDD: closed connection
17/07/11 14:15:32 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 22)
java.lang.OutOfMemoryError: GC overhead limit exceeded
17/07/11 14:15:32 INFO SparkUI: Stopped Spark web UI at http://192.168.175.160:4040
17/07/11 14:15:32 INFO DAGScheduler: Job 0 failed: collect at stack.scala:93, took 294.365864 s
Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808)

17/07/11 14:15:32 INFO DAGScheduler: ShuffleMapStage 1 (map at stack.scala:85) failed in 294.165 s due to Stage cancelled because SparkContext was shut down
17/07/11 14:15:32 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@cfb906)
17/07/11 14:15:32 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1499762732342,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down))
17/07/11 14:15:32 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker-1,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
17/07/11 14:15:32 INFO DiskBlockManager: Shutdown hook called
17/07/11 14:15:32 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/11 14:15:32 INFO ShutdownHookManager: Shutdown hook called
17/07/11 14:15:32 INFO MemoryStore: MemoryStore cleared
17/07/11 14:15:32 INFO BlockManager: BlockManager stopped
17/07/11 14:15:32 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/11 14:15:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b2ea8bd-95c0-45e4-a1cc-bd62b3899b24
17/07/11 14:15:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b2ea8bd-95c0-45e4-a1cc-bd62b3899b24/userFiles-194d73ba-fcfa-4616-ae17-78b0bba6b465
17/07/11 14:15:32 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Spark Configurations

虽然处于开发模式,但我使用 2g 内存和 1 个内核来执行。我是新来的火花,对不起这么幼稚的问题。

谢谢!

【问题讨论】:

你的问题昨天不是结束了吗? @eliasah,不,我从昨天开始尝试 我的错!它没有关闭,但您删除了它,因为您投了反对票,但您发布了相同的问题***.com/questions/45007011/… 是的,他们在没有任何解释的情况下投了反对票。! 这是一个质量很差的问题,人们不需要总是给出拒绝每个问题的理由。这非常耗时。也许downvoter应该在***.com/help/how-to-ask上指出你 【参考方案1】:

首先,您正在启动两个SparkSessions,这是非常没用的,您只是拆分资源。所以不要那样做!

其次,这就是问题所在。关于 Apache Spark 的并行性和 jdbc 源存在一个误解(别担心,这是一个陷阱!)。

这主要是由于缺少文档。 (我最后一次检查)

回到问题上来。实际发生的是以下行:

val destination = spark.read.jdbc(url, queryDestination, connectionProperties).rdd.map(_.mkString(","))

是将读取委托给单个工作人员。

所以主要是,如果你有足够的内存并且你成功地读取了这些数据。整个destination 数据将位于一个分区。而一个分区意味着麻烦!又名可能:

java.lang.OutOfMemoryError: GC overhead limit exceeded

所以发生的事情是,被选择来获取数据的单个执行程序不堪重负,它的 JVM 崩溃了。

让我们现在解决这个问题:

免责声明:以下代码摘自spark-gotchas,我是它的作者之一。)

所以让我们创建一些示例数据并将它们保存在我们的数据库中:

val options = Map(
  "url" -> "jdbc:postgresql://127.0.0.1:5432/spark",
  "dbtable" -> "data",
  "driver" -> "org.postgresql.Driver",
  "user" -> "spark",
  "password" -> "spark"
)

val newData = spark.range(1000000)
  .select($"id", lit(""), lit(true), current_timestamp())
  .toDF("id", "name", "valid", "ts")

newData.write.format("jdbc").options(options).mode("append").save

Apache Spark 提供了两种用于通过 JDBC 进行分布式数据加载的方法。第一个使用整数列对数据进行分区:

val dfPartitionedWithRanges = spark.read.options(options)
  .jdbc(options("url"), options("dbtable"), "id", 1, 5, 4, new java.util.Properties())

dfPartitionedWithRanges.rdd.partitions.size
// Int = 4

dfPartitionedWithRanges.rdd.glom.collect
// Array[Array[org.apache.spark.sql.Row]] = Array(
//   Array([1,foo,true,2012-01-01 00:03:00.0]),
//   Array([2,foo,false,2013-04-02 10:10:00.0]),
//   Array([3,bar,true,2015-11-02 22:00:00.0]),
//   Array([4,bar,false,2010-11-02 22:00:00.0]))
Partition column and bounds can provided using options as well:

val optionsWithBounds = options ++ Map(
  "partitionColumn" -> "id",
  "lowerBound" -> "1",
  "upperBound" -> "5",
  "numPartitions" -> "4"
)

spark.read.options(optionsWithBounds).format("jdbc").load

也可以使用选项提供分区列和边界:

val optionsWithBounds = options ++ Map(
  "partitionColumn" -> "id",
  "lowerBound" -> "1",
  "upperBound" -> "5",
  "numPartitions" -> "4"
)

spark.read.options(optionsWithBounds).format("jdbc").load

另一种选择是使用谓词序列,但我不会在这里谈论它。

您可以阅读有关 Spark SQL 和 JDBC 源 here 以及其他一些问题的更多信息。

我希望这会有所帮助。

【讨论】:

同样的错误我尝试使用小数据它工作但大量数据它抛出错误。我已经用更新的代码和错误编辑了问题。 我没有使用"driver" -&gt; "org.mysql.jdbc.driver",因为我提到了mysql jar 路径。 @Vignesh 请不要删除旧的错误消息,阅读您的问题的人会感到困惑。也请使用一个 SparkSession !这是我的第一句话 我尝试使用相同的 SparkSession,但它说 val 无法重新分配,所以我使用了另一个。 为什么要重新分配会话?

以上是关于Spark 2.1 在读取大量数据集时挂起的主要内容,如果未能解决你的问题,请参考以下文章

创建数据集时 Spark 无法反序列化记录

在读取 hbase 表时挂起 Mapreduce 作业

Spark:使用 Spark 处理大量文件的数据显示 SocketException:读取超时

Web API 服务在读取流时挂起

【大数据】Spark 递归读取 HDFS

面临 recv() 和 send() winsock api 的问题。 Recv() 在接收最后一个数据包时挂起