带有连接的 Sparkjob 中超出了 GC 开销限制

Posted

技术标签:

【中文标题】带有连接的 Sparkjob 中超出了 GC 开销限制【英文标题】:GC overhead limit exceeded in Sparkjob with join 【发布时间】:2017-03-09 18:59:21 【问题描述】:

我正在编写一个 Spark 作业,以获取按学生日期过滤的最新学生记录。但是当我用十万条记录尝试这个时,它工作正常。但是当我使用大量记录运行它时,我的 sparkjob 返回以下错误。

我猜这个错误是因为我从表中加载所有数据并将 int 放入 RDD 中。因为我的表包含大约 420 万条记录。如果是这样,有没有更好的方法来有效地加载这些数据并成功地继续我的操作?

请任何人帮我解决这个问题

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 10.10.10.10): java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2157)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1964)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3316)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:463)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3040)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2288)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2681)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1962)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:408)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:379)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

17/03/09 10:54:09 INFO TaskSetManager: Starting task 0.1 in stage    1.0 (TID 2, 10.10.10.10, partition 0, PROCESS_LOCAL, 5288 bytes)
17/03/09 10:54:09 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:   Launching task 2 on executor id: 1 hostname: 10.10.10.10.
17/03/09 10:54:09 WARN TransportChannelHandler: Exception in connection from /10.10.10.10:48464
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NiosocketChannel.doReadBytes(NioSocketChannel.    java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNio    ByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoo    p.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:38    2)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEvent    Executor.java:111)
at java.lang.Thread.run(Thread.java:745)
17/03/09 10:54:09 ERROR TaskSchedulerImpl: Lost executor 1 on     10.10.10.10: Remote RPC client disassociated. Likely due to containers     exceeding thresholds, or network issues. Check driver logs for WARN     messages.
17/03/09 10:54:09 INFO StandaloneAppClient$ClientEndpoint: Executor     updated: app-20170309105209-0032/1 is now EXITED (Command exited with code     52)

代码

object StudentDataPerformanceEnhancerImpl extends studentDataPerformanceEnhancer 
    val LOG = LoggerFactory.getLogger(this.getClass.getName)
    val USER_PRIMARY_KEY = "user_id";
    val COURSE_PRIMARY_KEY = "course_id";

    override def extractData(sparkContext: SparkContext, sparkSession:    SparkSession, jobConfiguration: JobConfiguration): Unit = 
        val context = sparkSession.read.format("jdbc")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("url", jobConfiguration.jdbcURL)
        .option("dbtable", "student_student")
        .option("user", test_user)
        .option("password", test_password)
        .load()
        context.cache()

        val mainRDD = context.rdd.map(k => ((k.getLong(k.fieldIndex(USER_PRIMARY_KEY)),
            k.getLong(k.fieldIndex(COURSE_PRIMARY_KEY)),
            k.getTimestamp(k.fieldIndex("student_date_time"))),
            (k.getLong(k.fieldIndex(USER_PRIMARY_KEY)),
                k.getLong(k.fieldIndex(COURSE_PRIMARY_KEY)),
                k.getTimestamp(k.fieldIndex("student_date_time")),
                k.getString(k.fieldIndex("student_student_index")),
                k.getLong(k.fieldIndex("student_category_pk")),
                k.getString(k.fieldIndex("effeciency")),
                k.getString(k.fieldIndex("net_score")),
                k.getString(k.fieldIndex("avg_score")),
                k.getString(k.fieldIndex("test_score"))))).persist(StorageLevel.DISK_ONLY)

        LOG.info("Data extractions started....!")
        try 
            val studentCompositeRDD = context.rdd.map(r => ((r.getLong(r.fieldIndex(USER_PRIMARY_KEY)),
            r.getLong(r.fieldIndex(COURSE_PRIMARY_KEY))),
            r.getTimestamp(r.fieldIndex("student_date_time")))).reduceByKey((t1, t2) => if (t1.after(t2)) t1 else t2)
                .map(t => ((t._1._1, t._1._2, t._2), t._2)).persist(StorageLevel.DISK_ONLY)
            val filteredRDD = mainRDD.join(studentCompositeRDD).map(k => k._2._1)
            DataWriter.persistLatestData(filteredRDD)
         catch 
            case e: Exception => LOG.error("Error in spark job: " + e.getMessage)
        
    

下面是我的数据库持久化相关的DataWriter类

object DataWriter 
    def persistLatestStudentRiskData(rDD: RDD[(Long, Long, Timestamp, String,    Long, String, String, String, String)]): Unit = 
        var jdbcConnection: java.sql.Connection = null
        try 
            jdbcConnection = DatabaseUtil.getConnection
            if (jdbcConnection != null) 
                val statement = "call insert_latest_student_risk    (?,?,?,?,?,?,?,?,?)"
                val callableStatement = jdbcConnection.prepareCall(statement)

                rDD.collect().foreach(x => sendLatestStudentRiskData(callableStatement, x))
            
         catch 
            case e: SQLException => LOG.error("Error in executing insert_latest_student_risk stored procedure : " + e.getMessage)
            case e: RuntimeException => LOG.error("Error in the latest student persistence: " + e.getMessage)
            case e: Exception => LOG.error("Error in the latest student persistence: " + e.getMessage)
         finally 
            if (jdbcConnection != null) 
                try 
                    jdbcConnection.close()
                 catch 
                    case e: SQLException => LOG.error("Error in jdbc connection close : " + e.getMessage)
                    case e: Exception => LOG.error("Error in executing insert_latest_student_risk stored procedure : " + e.getMessage)
                
            
        
    

    def sendLatestStudentRiskData(callableStatement: java.sql.CallableStatement,
        latestStudentData: (Long, Long, Timestamp,   String, Long,
    String, String, String, String)): Unit = 
        try 
            callableStatement.setLong(1, latestStudentData._1)
            callableStatement.setLong(2, latestStudentData._2)
            callableStatement.setTimestamp(3, latestStudentData._3)
            callableStatement.setString(4, latestStudentData._4)
            callableStatement.setLong(5, latestStudentData._5)
            callableStatement.setString(6, latestStudentData._6)
            callableStatement.setString(7, latestStudentData._7)
            callableStatement.setString(8, latestStudentData._8)
            callableStatement.setString(9, latestStudentData._9)

            callableStatement.executeUpdate
         catch 
            case e: SQLException => LOG.error("Error in executing insert_latest_student_risk stored procedure : " + e.getMessage)
        
    

【问题讨论】:

DataWriter.persistLatestData 定义在哪里? @puhlen 我用 DataWriter.persistLatestData 更新了问题 【参考方案1】:

问题不在于您将数据放入 RDD,而在于您将数据从 RDD 中取出并放入驱动程序内存中。具体来说,问题在于您用来保存数据的collect 调用。你应该删除它。 collect 将整个 RDD 带到驱动程序的内存中,您不再使用 spark 和集群来处理数据,因此除非您的数据非常小,否则您很快就会耗尽内存。 collect 很少被 spark 进程使用,它主要用于少量数据的开发和调试。它在生产应用程序中用于一些支持操作,但不作为主要数据流。

如果您使用 spark-sql,Spark 能够直接写入 jdbc,利用它并​​删除对 collect 的调用。

【讨论】:

以上是关于带有连接的 Sparkjob 中超出了 GC 开销限制的主要内容,如果未能解决你的问题,请参考以下文章

线程“main”中的异常java.lang.OutOfMemoryError:GWT应用程序中超出了GC开销限制

无法使用弹性接收器连接器将数据从融合平台发送到 Elasticsearch。异常:错误处理程序中超出了容差

Socket 连接 过程中超时问题的处理

Android Studio Google JAR 文件导致 GC 开销限制超出错误

Scala:如何进行字符串连接以避免 GC 开销问题

RecursionError:比较中超出了最大递归深度'