无法隔离 Scala 批量数据加载应用程序中的 JDBC 内存泄漏

Posted

技术标签:

【中文标题】无法隔离 Scala 批量数据加载应用程序中的 JDBC 内存泄漏【英文标题】:Cannot isolate JDBC memory leak in Scala bulk data loading application 【发布时间】:2014-01-24 20:48:21 【问题描述】:

我在 Scala 中编写了一个传感器网络 Web 服务(这是我的第一个 Scala 应用程序;是的,我知道它使用 Spring Dependency Injection。我正在删除很多内容。不要评判)。无论如何,赞助它的公司倒闭了,我从 Web 服务的原始 XML 数据包中提取了所有数据。我正在尝试将该数据加载回数据库。该服务支持 Postgres、mysql 和 Microsoft SQL(我们使用的原始数据库)。我可以让 Postgres 正常加载,但即使我使用 8GB 或 12GB 堆,MySQL/Microsoft 也会经常出现 OutOfMemeory 问题。

我认为 Postgres 驱动程序可能只是在内部数据结构上更有效,因为我看到使用 Postgres 时内存增加并且没有被释放,只是没有那么多。我很小心地关闭了 ResultSet 对象和 Connection 对象,但我仍然遗漏了一些东西。

这是我的 Scala 批量加载器,它采用 XML 文件的 tar.bz2 并将其加载到

val BUFFER_SIZE = 4096
val PACKAGE_CHUNK_SIZE = 10000

def main(args : Array[String])  

if(args.length != 1) 
  System.err.println("Usage: %s [bzip2 file]".format(this.getClass))
  System.exit(1)


val loader = MySpring.getObject("FormatAGRA.XML").asInstanceOf[FormatTrait]
val db     = MySpring.getObject("serviceDataHandler").asInstanceOf[ServiceDataHandlerTrait]

val bzin = new TarArchiveInputStream(new BZip2CompressorInputStream(new BufferedInputStream(new FileInputStream(args(0)))))
val models = new ListBuffer[ModelTrait]()
var chunks = 0

Stream.continually(bzin.getNextEntry()).takeWhile(_ != null) foreach 
  entry => 
    if(entry.asInstanceOf[TarArchiveEntry].isFile()) 

      val xmlfile = new ByteArrayOutputStream()
      IOUtils.copy(bzin,xmlfile)
      //val models = new ListBuffer[ModelTrait]()
      models.appendAll( loader.loadModels(new String(xmlfile.toByteArray())) )
      System.out.println(String.format("Processing Entry %s",entry.getName));

      chunks = chunks + 1
      if( chunks % PACKAGE_CHUNK_SIZE == 0) 
        System.out.println("Sending batch of %d to database".format(PACKAGE_CHUNK_SIZE))
        db.loadData(models.toList.asInstanceOf[List[DataModel]])
        models.clear()
      
    
  

现在是那些讨厌的 Spring 细节。这是我的豆子

<bean id="serviceDataHandler" parent="baseDataHandler" class="io.bigsense.db.ServiceDataHandler">
    <property name="ds" ref="serviceDataSource" />
</bean>

<!-- Database configurations -->
<bean id="baseDataSource" abstract="true" class="com.jolbox.bonecp.BoneCPDataSource" destroy-method="close">
   <property name="driverClass" value="dbDriver" />
   <property name="jdbcUrl" value="connectionString" />
   <property name="idleConnectionTestPeriod" value="60"/>
   <property name="idleMaxAge" value="240"/>
   <property name="maxConnectionsPerPartition" value="dbPoolMaxPerPart"/>
   <property name="minConnectionsPerPartition" value="dbPoolMinPerPart"/>
   <property name="partitionCount" value="dbPoolPartitions"/>
   <property name="acquireIncrement" value="5"/>
   <property name="statementsCacheSize" value="100"/>
   <property name="releaseHelperThreads" value="3"/>
</bean>

<bean id="serviceDataSource" parent="baseDataSource" >
   <property name="username" value="dbUser"/>
   <property name="password" value="dbPass"/>
</bean>

dbUser/dbPass/connectionString/dbDriver 之类的东西在编译时被替换(以后的版本将使用运行时属性文件,因此您不必为不同的配置重新编译 war。但您了解基本概念。

作为 FormatAGRA.XML 引入的模型只是将 XML 读取到一个对象中(是的,我知道这很糟糕……XML 将在下一个版本中消失,仅限 JSON!):

class AgraDataXMLFormat extends FormatTrait 

def renderModels(model : List[ModelTrait]) : String = 

if(model.length > 0) 
  model.head match 
    case x:DataModel => 
      return <AgraData> 
          for( pack <- model.asInstanceOf[List[DataModel]]) yield 
            <package id=pack.uniqueId timestamp=pack.timestamp>
            <sensors>
              for( sensor <- pack.sensors) yield 
                <sensor id=sensor.uniqueId type=sensor.stype units=sensor.units timestamp=sensor.timestamp>
                <data>sensor.data</data></sensor>
              
            </sensors><errors> for(error <- pack.errors) yield 
              <error>error</error>
            
            </errors></package>
          
      </AgraData>.toString()       
    
    case x:RelayModel => 
      return <AgraRelays> 
          for( r <- model.asInstanceOf[List[RelayModel]]) yield 
            /* TODO Get this working */
            /* <relay id=r.id identifier=r.identifier publicKey=r.publicKey />*/
          
      </AgraRelays>.toString()
    
    case _ => 
      //TODO: This needs to be an exception to generate a 400 BAD RESPONSE
      "Format not implemented for given model Type"
    
  

//TODO throw exception? No...hmm
""



def loadModels(data : String) : List[ModelTrait] =  

var xml : Elem = XML.loadString(data)

var log : Logger = Logger.getLogger(this.getClass())

var models = new ListBuffer[DataModel]

for( pack <- xml \\ "package") yield 

    var model = new DataModel()

    var sensors = pack \ "sensors"
    var errors = pack \ "errors"
    model.timestamp = (pack \"@timestamp").text.trim()
    model.uniqueId = (pack \"@id"  ).text.trim()

    var sbList = new ListBuffer[SensorModel]()
    var sbErr = new ListBuffer[String]()

    for( node <- sensors \"sensor") yield 
      var sensorData = new SensorModel()
      sensorData.uniqueId = (node\"@id").text.trim()
      sensorData.stype = (node\"@type").text.trim()
      sensorData.units = (node\"@units").text.trim()
      sensorData.timestamp  = (node\"@timestamp").text.trim()
      sensorData.data = (node\"data").text.trim()
      sbList += sensorData
    
    for( err <- errors \"error") yield 
      model.errors.append(err.text.trim())
    
    model.sensors = sbList.toList
    models += model
 
models.toList



最后是有趣的部分。数据库的东西。有一个基本特征。它有一些用于关闭连接和运行查询的样板文件。所有查询都使用这个 runQuery()。我正在关闭连接和结果集。我无法完全弄清楚泄漏在哪里。我觉得这与我处理 JDBC 的方式有关,因为即使 PostgreSQL 中存在泄漏(我可以看到内存使用量的增加),它仍然可以完成加载而不会耗尽 8GB 堆。相同的数据集在 MS SQL 和 MySQL 上失败了大约 100,000 条记录

trait DataHandlerTrait 

@BeanProperty
var ds : DataSource = _

@BeanProperty
var converters : scala.collection.mutable.Map[String,ConverterTrait] = _

@BeanProperty
var sqlCommands : EProperties = _

@BeanProperty
var dbDialect : String = _

val DB_MSSQL = "mssql"
val DB_MYSQL = "mysql"
val DB_PGSQL = "pgsql"

protected var log = Logger.getLogger(getClass())

//Taken From: http://zcox.wordpress.com/2009/08/17/simple-jdbc-queries-in-scala/
protected def using[Closeable <: def close(): Unit, B](closeable: Closeable)(getB: Closeable => B): B =
try 
  getB(closeable)
 finally 
  try  closeable.close()  catch  case e:Exception =>  



protected def runQuery(req: DBRequest): DBResult = 

val retval = new DBResult()

val consBuilder = new StringBuilder(sqlCommands.getProperty(req.queryName))

val paramList: ListBuffer[Any] = new ListBuffer()
paramList.appendAll(req.args)

//constraints
// (we can't use mkstring because we need to deal with the
//  complex case of if something is an actual constraint (happens in query)
//  or a conversation (happens row by row)
var whereAnd = " WHERE "
for ((para, list) <- req.constraints) 
  val con = sqlCommands.getProperty("constraint" + para)
  if ((!converters.contains(para)) && (con == null || con == "")) 
    throw new DatabaseException("Unknown Constraint: %s".format(para))
  
  else if (!converters.contains(para)) 
    for (l <- list) 
      consBuilder.append(whereAnd)
      consBuilder.append(con)
      paramList.append(l)
      whereAnd = " AND "
    
  

...最后是实际的数据加载函数:

def loadData(sets : List[DataModel]) : List[Int] = 

val log = Logger.getLogger(this.getClass())
var generatedIds : ListBuffer[Int] = new ListBuffer()


using(ds.getConnection())  conn =>
    //Start Transaction
    conn.setAutoCommit(false)
    try
        var req : DBRequest = null

        sets.foreach( set => 
          req = new DBRequest(conn,"getRelayId")
          req.args = List(set.uniqueId)
          val rid : DBResult = runQuery(req)

          var relayId : java.lang.Integer = null
          if(rid.results.length == 0) 
            req = new DBRequest(conn,"registerRelay")
            req.args = List(set.uniqueId)
            relayId = runQuery(req).generatedKeys(0).asInstanceOf[Int]
          
          else 
            relayId = rid.results(0)("id").toString().toInt;
          

          req = new DBRequest(conn,"addDataPackage")


          req.args = List(TimeHelper.timestampToDate(set.timestamp),relayId)
          val packageId = runQuery(req)
             .generatedKeys(0)
             .asInstanceOf[Int]
          generatedIds += packageId //We will pull data in GET via packageId      

          var sensorId : java.lang.Integer = -1
          set.sensors.foreach( sensor => 
             req = new DBRequest(conn,"getSensorRecord")
             req.args = List(relayId,sensor.uniqueId)
             val sid : DBResult = runQuery(req)
             if(sid.results.length == 0) 
               req = new DBRequest(conn,"addSensorRecord")
               req.args = List(sensor.uniqueId,relayId,sensor.stype,sensor.units)
               sensorId = runQuery(req).generatedKeys(0).toString().toInt
             
             else 
               sensorId = sid.results(0)("id").toString().toInt;
             
             req = new DBRequest(conn,"addSensorData")
             req.args = List(packageId,sensorId,sensor.data)
             runQuery(req)
          )
          set.processed.foreach( pro => 
            pro.units match 
              case "NImageU" => 
                req = new DBRequest(conn,"addImage")
                req.args = List(packageId, sensorId, new ByteArrayInputStream(Base64.decodeBase64(pro.data)))
                runQuery(req)
              
              case "NCounterU" =>  /*TODO: Implement Me*/
              case _ =>  set.errors.append("Unknown Processed Unit Type %s for Sensor %s With Data %s at Time %s"
                  .format(pro.units,pro.uniqueId,pro.data,pro.timestamp)) 
            
          )          
          set.errors.foreach( error => 
            req = new DBRequest(conn,"addError")
            req.args = List(packageId,error)
            runQuery(req)
          )

        )
        conn.commit()
    
    catch 
      case e:Exception =>  
        //make sure we unlock the transaction but pass the exception onward
        conn.rollback() 
        throw e
      
    
    conn.setAutoCommit(true)
    generatedIds.toList



//group by and order by
for (i <- List(req.group, req.order)) yield 
  i match 
    case Some(i: String) => consBuilder.append(sqlCommands.getProperty(i))
    case None => 
  


//prepare statement
log.debug("SQL Statement: %s".format(consBuilder.toString()))

/* PostgreSQL drivers quirk. If you use RETURN_GENERATED_KEYS, it adds RETURING
   to the end of every statement! Meanwhile, certain MySQL SELECT statements need RETURN_GENERATED_KEYS.
 */
var keys = Statement.RETURN_GENERATED_KEYS
if (dbDialect == DB_PGSQL) 
  keys = if (consBuilder.toString().toUpperCase().startsWith("INSERT")) Statement.RETURN_GENERATED_KEYS else Statement.NO_GENERATED_KEYS


using(req.conn.prepareStatement(consBuilder.toString(), keys)) 
  stmt =>

  //row limit
    if (req.maxRows > 0) 
      stmt.setMaxRows(req.maxRows)
    

    var x = 1
    paramList.foreach(a => 
      log.debug("Parameter %s: %s".format(x, a))
      a.asInstanceOf[AnyRef] match 
        case s: java.lang.Integer => 
          stmt.setInt(x, s)
        
        case s: String => 
          stmt.setString(x, s)
        
        case s: Date => 
          stmt.setDate(x, s, Calendar.getInstance(TimeZone.getTimeZone("UTC")))
        
        case s: Time => 
          stmt.setTime(x, s)
        
        case s: Timestamp => 
          stmt.setTimestamp(x, s, Calendar.getInstance(TimeZone.getTimeZone("UTC")))
        
        case s: ByteArrayInputStream => 
          stmt.setBinaryStream(x, s, s.asInstanceOf[ByteArrayInputStream].available())
        
        case s => 
          stmt.setObject(x, s)
        
      
      x += 1
    )


    //run statement
    stmt.execute()
    log.debug("Statement Executed")

    //get auto-insert keys
    val keys = stmt.getGeneratedKeys()
    if (keys != null) 
      var keybuf = new ListBuffer[Any]();
      while (keys.next()) 
        keybuf += keys.getInt(1)
      
      retval.generatedKeys = keybuf.toList
    

    //pull results
    log.debug("Pulling Results")
    using(stmt.getResultSet()) 
      ret =>
        if (ret != null) 

          val meta = ret.getMetaData()

          var retbuf = new ListBuffer[Map[String, Any]]()
          while (ret.next) 
            val rMap = scala.collection.mutable.Map[String, Any]()

            for (i <- 1 to meta.getColumnCount()) 
              rMap += (meta.getColumnLabel(i) -> ret.getObject(i))
            

            //conversion
            for ((para, arg) <- req.constraints) 
              if (converters.contains(para)) 
                for (a <- arg) 
                  log.debug("Running Converstion %s=%s".format(para, a))
                  converters(para).convertRow(rMap, a.toString)
                
              
            


            retbuf += Map(rMap.toSeq: _*)
          

          retval.results = retbuf.toList
          ret.close()
        
    
    log.debug("Result Pull Complete")

retval




我尝试通过监视器将其泵入并查看堆转储,但不知道在哪里解决这个问题。我知道我可以将 bzip 分成更小的批次,但如果出现内存泄漏,我真的需要启动它。我不想在生产中每个月左右都重启集群成员。

这是我正在处理的源代码的当前副本的提交:

https://github.com/sumdog/BigSense/tree/fbd026124e09785bfecc834af6932b9952945fc6

【问题讨论】:

在您的位置上,我将使用 VisualVM、Eclipse 内存分析工具或 jhat 查看堆使用情况。我不确定有没有其他合理的方法来做。 【参考方案1】:

您在这里发布了很多信息。我对内存泄漏的第一遍是确保我关闭了所有旧的输入和输出流(虽然我怀疑它们是你关闭 TarInputStream 和 ByteArrayOutputStream 的问题吗?)。在确保一切正常后,我会检查进程打开了哪些文件。在 Linux 中,我将获取进程 ID 并执行“ls /proc/$processID/fd”,您可以看到该进程打开的所有文件。最后进入visualvm或其他分析器,看看打开线程的数量是怎么回事......如果它们随着时间的推移不断增加,你肯定会因为未关闭的连接而发生内存泄漏。如果在使用 Postgres 和“其他”时您的进程之间的线程数没有差异,那么也许您可以得出问题出在“其他”的结论。如果“其他人”在一次处理输入文件的大小时遇到​​问题,那么拆分输入文件的想法理论上可以解决他们的问题。

【讨论】:

【参考方案2】:

发现问题。在通过 VisualVM 运行所有内容后,我注意到线程数保持不变,但仍有大量 JDBC4ResultSet 对象存在。我以为我要关闭所有这些,但后来我仔细观察并注意到了这一点:

//get auto-insert keys
val keys = stmt.getGeneratedKeys()
if (keys != null) 
  var keybuf = new ListBuffer[Any]();
  while (keys.next()) 
    keybuf += keys.getInt(1)
  
  retval.generatedKeys = keybuf.toList

我没有意识到 stmt.getGeneratedKeys() 实际上返回了一个 ResultSet!将其更改为使用 Closable 包装器解决了该问题:

    //get auto-insert keys
    using(stmt.getGeneratedKeys())  keys =>
      if (keys != null) 
        var keybuf = new ListBuffer[Any]();
        while (keys.next()) 
          keybuf += keys.getInt(1)
        
        retval.generatedKeys = keybuf.toList
      
    

之前:

之后:

【讨论】:

以上是关于无法隔离 Scala 批量数据加载应用程序中的 JDBC 内存泄漏的主要内容,如果未能解决你的问题,请参考以下文章

注册函数时Databricks SCALA UDF无法加载类

Scala都认识?厉害了~

如何从scala对象中的sbt ProjectPlugin.scala加载对象

scala的trait执行报错: 错误: 找不到或无法加载主类 cn.itcast.scala.`trait`

Scala中的“评估”

Azure blob 到 Azure SQL 数据库:无法批量加载,因为无法打开文件“xxxx.csv”。操作系统错误代码 5(访问被拒绝。)