无法隔离 Scala 批量数据加载应用程序中的 JDBC 内存泄漏
Posted
技术标签:
【中文标题】无法隔离 Scala 批量数据加载应用程序中的 JDBC 内存泄漏【英文标题】:Cannot isolate JDBC memory leak in Scala bulk data loading application 【发布时间】:2014-01-24 20:48:21 【问题描述】:我在 Scala 中编写了一个传感器网络 Web 服务(这是我的第一个 Scala 应用程序;是的,我知道它使用 Spring Dependency Injection。我正在删除很多内容。不要评判)。无论如何,赞助它的公司倒闭了,我从 Web 服务的原始 XML 数据包中提取了所有数据。我正在尝试将该数据加载回数据库。该服务支持 Postgres、mysql 和 Microsoft SQL(我们使用的原始数据库)。我可以让 Postgres 正常加载,但即使我使用 8GB 或 12GB 堆,MySQL/Microsoft 也会经常出现 OutOfMemeory 问题。
我认为 Postgres 驱动程序可能只是在内部数据结构上更有效,因为我看到使用 Postgres 时内存增加并且没有被释放,只是没有那么多。我很小心地关闭了 ResultSet 对象和 Connection 对象,但我仍然遗漏了一些东西。
这是我的 Scala 批量加载器,它采用 XML 文件的 tar.bz2 并将其加载到
val BUFFER_SIZE = 4096
val PACKAGE_CHUNK_SIZE = 10000
def main(args : Array[String])
if(args.length != 1)
System.err.println("Usage: %s [bzip2 file]".format(this.getClass))
System.exit(1)
val loader = MySpring.getObject("FormatAGRA.XML").asInstanceOf[FormatTrait]
val db = MySpring.getObject("serviceDataHandler").asInstanceOf[ServiceDataHandlerTrait]
val bzin = new TarArchiveInputStream(new BZip2CompressorInputStream(new BufferedInputStream(new FileInputStream(args(0)))))
val models = new ListBuffer[ModelTrait]()
var chunks = 0
Stream.continually(bzin.getNextEntry()).takeWhile(_ != null) foreach
entry =>
if(entry.asInstanceOf[TarArchiveEntry].isFile())
val xmlfile = new ByteArrayOutputStream()
IOUtils.copy(bzin,xmlfile)
//val models = new ListBuffer[ModelTrait]()
models.appendAll( loader.loadModels(new String(xmlfile.toByteArray())) )
System.out.println(String.format("Processing Entry %s",entry.getName));
chunks = chunks + 1
if( chunks % PACKAGE_CHUNK_SIZE == 0)
System.out.println("Sending batch of %d to database".format(PACKAGE_CHUNK_SIZE))
db.loadData(models.toList.asInstanceOf[List[DataModel]])
models.clear()
现在是那些讨厌的 Spring 细节。这是我的豆子
<bean id="serviceDataHandler" parent="baseDataHandler" class="io.bigsense.db.ServiceDataHandler">
<property name="ds" ref="serviceDataSource" />
</bean>
<!-- Database configurations -->
<bean id="baseDataSource" abstract="true" class="com.jolbox.bonecp.BoneCPDataSource" destroy-method="close">
<property name="driverClass" value="dbDriver" />
<property name="jdbcUrl" value="connectionString" />
<property name="idleConnectionTestPeriod" value="60"/>
<property name="idleMaxAge" value="240"/>
<property name="maxConnectionsPerPartition" value="dbPoolMaxPerPart"/>
<property name="minConnectionsPerPartition" value="dbPoolMinPerPart"/>
<property name="partitionCount" value="dbPoolPartitions"/>
<property name="acquireIncrement" value="5"/>
<property name="statementsCacheSize" value="100"/>
<property name="releaseHelperThreads" value="3"/>
</bean>
<bean id="serviceDataSource" parent="baseDataSource" >
<property name="username" value="dbUser"/>
<property name="password" value="dbPass"/>
</bean>
dbUser/dbPass/connectionString/dbDriver 之类的东西在编译时被替换(以后的版本将使用运行时属性文件,因此您不必为不同的配置重新编译 war。但您了解基本概念。
作为 FormatAGRA.XML 引入的模型只是将 XML 读取到一个对象中(是的,我知道这很糟糕……XML 将在下一个版本中消失,仅限 JSON!):
class AgraDataXMLFormat extends FormatTrait
def renderModels(model : List[ModelTrait]) : String =
if(model.length > 0)
model.head match
case x:DataModel =>
return <AgraData>
for( pack <- model.asInstanceOf[List[DataModel]]) yield
<package id=pack.uniqueId timestamp=pack.timestamp>
<sensors>
for( sensor <- pack.sensors) yield
<sensor id=sensor.uniqueId type=sensor.stype units=sensor.units timestamp=sensor.timestamp>
<data>sensor.data</data></sensor>
</sensors><errors> for(error <- pack.errors) yield
<error>error</error>
</errors></package>
</AgraData>.toString()
case x:RelayModel =>
return <AgraRelays>
for( r <- model.asInstanceOf[List[RelayModel]]) yield
/* TODO Get this working */
/* <relay id=r.id identifier=r.identifier publicKey=r.publicKey />*/
</AgraRelays>.toString()
case _ =>
//TODO: This needs to be an exception to generate a 400 BAD RESPONSE
"Format not implemented for given model Type"
//TODO throw exception? No...hmm
""
def loadModels(data : String) : List[ModelTrait] =
var xml : Elem = XML.loadString(data)
var log : Logger = Logger.getLogger(this.getClass())
var models = new ListBuffer[DataModel]
for( pack <- xml \\ "package") yield
var model = new DataModel()
var sensors = pack \ "sensors"
var errors = pack \ "errors"
model.timestamp = (pack \"@timestamp").text.trim()
model.uniqueId = (pack \"@id" ).text.trim()
var sbList = new ListBuffer[SensorModel]()
var sbErr = new ListBuffer[String]()
for( node <- sensors \"sensor") yield
var sensorData = new SensorModel()
sensorData.uniqueId = (node\"@id").text.trim()
sensorData.stype = (node\"@type").text.trim()
sensorData.units = (node\"@units").text.trim()
sensorData.timestamp = (node\"@timestamp").text.trim()
sensorData.data = (node\"data").text.trim()
sbList += sensorData
for( err <- errors \"error") yield
model.errors.append(err.text.trim())
model.sensors = sbList.toList
models += model
models.toList
最后是有趣的部分。数据库的东西。有一个基本特征。它有一些用于关闭连接和运行查询的样板文件。所有查询都使用这个 runQuery()。我正在关闭连接和结果集。我无法完全弄清楚泄漏在哪里。我觉得这与我处理 JDBC 的方式有关,因为即使 PostgreSQL 中存在泄漏(我可以看到内存使用量的增加),它仍然可以完成加载而不会耗尽 8GB 堆。相同的数据集在 MS SQL 和 MySQL 上失败了大约 100,000 条记录
trait DataHandlerTrait
@BeanProperty
var ds : DataSource = _
@BeanProperty
var converters : scala.collection.mutable.Map[String,ConverterTrait] = _
@BeanProperty
var sqlCommands : EProperties = _
@BeanProperty
var dbDialect : String = _
val DB_MSSQL = "mssql"
val DB_MYSQL = "mysql"
val DB_PGSQL = "pgsql"
protected var log = Logger.getLogger(getClass())
//Taken From: http://zcox.wordpress.com/2009/08/17/simple-jdbc-queries-in-scala/
protected def using[Closeable <: def close(): Unit, B](closeable: Closeable)(getB: Closeable => B): B =
try
getB(closeable)
finally
try closeable.close() catch case e:Exception =>
protected def runQuery(req: DBRequest): DBResult =
val retval = new DBResult()
val consBuilder = new StringBuilder(sqlCommands.getProperty(req.queryName))
val paramList: ListBuffer[Any] = new ListBuffer()
paramList.appendAll(req.args)
//constraints
// (we can't use mkstring because we need to deal with the
// complex case of if something is an actual constraint (happens in query)
// or a conversation (happens row by row)
var whereAnd = " WHERE "
for ((para, list) <- req.constraints)
val con = sqlCommands.getProperty("constraint" + para)
if ((!converters.contains(para)) && (con == null || con == ""))
throw new DatabaseException("Unknown Constraint: %s".format(para))
else if (!converters.contains(para))
for (l <- list)
consBuilder.append(whereAnd)
consBuilder.append(con)
paramList.append(l)
whereAnd = " AND "
...最后是实际的数据加载函数:
def loadData(sets : List[DataModel]) : List[Int] =
val log = Logger.getLogger(this.getClass())
var generatedIds : ListBuffer[Int] = new ListBuffer()
using(ds.getConnection()) conn =>
//Start Transaction
conn.setAutoCommit(false)
try
var req : DBRequest = null
sets.foreach( set =>
req = new DBRequest(conn,"getRelayId")
req.args = List(set.uniqueId)
val rid : DBResult = runQuery(req)
var relayId : java.lang.Integer = null
if(rid.results.length == 0)
req = new DBRequest(conn,"registerRelay")
req.args = List(set.uniqueId)
relayId = runQuery(req).generatedKeys(0).asInstanceOf[Int]
else
relayId = rid.results(0)("id").toString().toInt;
req = new DBRequest(conn,"addDataPackage")
req.args = List(TimeHelper.timestampToDate(set.timestamp),relayId)
val packageId = runQuery(req)
.generatedKeys(0)
.asInstanceOf[Int]
generatedIds += packageId //We will pull data in GET via packageId
var sensorId : java.lang.Integer = -1
set.sensors.foreach( sensor =>
req = new DBRequest(conn,"getSensorRecord")
req.args = List(relayId,sensor.uniqueId)
val sid : DBResult = runQuery(req)
if(sid.results.length == 0)
req = new DBRequest(conn,"addSensorRecord")
req.args = List(sensor.uniqueId,relayId,sensor.stype,sensor.units)
sensorId = runQuery(req).generatedKeys(0).toString().toInt
else
sensorId = sid.results(0)("id").toString().toInt;
req = new DBRequest(conn,"addSensorData")
req.args = List(packageId,sensorId,sensor.data)
runQuery(req)
)
set.processed.foreach( pro =>
pro.units match
case "NImageU" =>
req = new DBRequest(conn,"addImage")
req.args = List(packageId, sensorId, new ByteArrayInputStream(Base64.decodeBase64(pro.data)))
runQuery(req)
case "NCounterU" => /*TODO: Implement Me*/
case _ => set.errors.append("Unknown Processed Unit Type %s for Sensor %s With Data %s at Time %s"
.format(pro.units,pro.uniqueId,pro.data,pro.timestamp))
)
set.errors.foreach( error =>
req = new DBRequest(conn,"addError")
req.args = List(packageId,error)
runQuery(req)
)
)
conn.commit()
catch
case e:Exception =>
//make sure we unlock the transaction but pass the exception onward
conn.rollback()
throw e
conn.setAutoCommit(true)
generatedIds.toList
//group by and order by
for (i <- List(req.group, req.order)) yield
i match
case Some(i: String) => consBuilder.append(sqlCommands.getProperty(i))
case None =>
//prepare statement
log.debug("SQL Statement: %s".format(consBuilder.toString()))
/* PostgreSQL drivers quirk. If you use RETURN_GENERATED_KEYS, it adds RETURING
to the end of every statement! Meanwhile, certain MySQL SELECT statements need RETURN_GENERATED_KEYS.
*/
var keys = Statement.RETURN_GENERATED_KEYS
if (dbDialect == DB_PGSQL)
keys = if (consBuilder.toString().toUpperCase().startsWith("INSERT")) Statement.RETURN_GENERATED_KEYS else Statement.NO_GENERATED_KEYS
using(req.conn.prepareStatement(consBuilder.toString(), keys))
stmt =>
//row limit
if (req.maxRows > 0)
stmt.setMaxRows(req.maxRows)
var x = 1
paramList.foreach(a =>
log.debug("Parameter %s: %s".format(x, a))
a.asInstanceOf[AnyRef] match
case s: java.lang.Integer =>
stmt.setInt(x, s)
case s: String =>
stmt.setString(x, s)
case s: Date =>
stmt.setDate(x, s, Calendar.getInstance(TimeZone.getTimeZone("UTC")))
case s: Time =>
stmt.setTime(x, s)
case s: Timestamp =>
stmt.setTimestamp(x, s, Calendar.getInstance(TimeZone.getTimeZone("UTC")))
case s: ByteArrayInputStream =>
stmt.setBinaryStream(x, s, s.asInstanceOf[ByteArrayInputStream].available())
case s =>
stmt.setObject(x, s)
x += 1
)
//run statement
stmt.execute()
log.debug("Statement Executed")
//get auto-insert keys
val keys = stmt.getGeneratedKeys()
if (keys != null)
var keybuf = new ListBuffer[Any]();
while (keys.next())
keybuf += keys.getInt(1)
retval.generatedKeys = keybuf.toList
//pull results
log.debug("Pulling Results")
using(stmt.getResultSet())
ret =>
if (ret != null)
val meta = ret.getMetaData()
var retbuf = new ListBuffer[Map[String, Any]]()
while (ret.next)
val rMap = scala.collection.mutable.Map[String, Any]()
for (i <- 1 to meta.getColumnCount())
rMap += (meta.getColumnLabel(i) -> ret.getObject(i))
//conversion
for ((para, arg) <- req.constraints)
if (converters.contains(para))
for (a <- arg)
log.debug("Running Converstion %s=%s".format(para, a))
converters(para).convertRow(rMap, a.toString)
retbuf += Map(rMap.toSeq: _*)
retval.results = retbuf.toList
ret.close()
log.debug("Result Pull Complete")
retval
我尝试通过监视器将其泵入并查看堆转储,但不知道在哪里解决这个问题。我知道我可以将 bzip 分成更小的批次,但如果出现内存泄漏,我真的需要启动它。我不想在生产中每个月左右都重启集群成员。
这是我正在处理的源代码的当前副本的提交:
https://github.com/sumdog/BigSense/tree/fbd026124e09785bfecc834af6932b9952945fc6
【问题讨论】:
在您的位置上,我将使用 VisualVM、Eclipse 内存分析工具或 jhat 查看堆使用情况。我不确定有没有其他合理的方法来做。 【参考方案1】:您在这里发布了很多信息。我对内存泄漏的第一遍是确保我关闭了所有旧的输入和输出流(虽然我怀疑它们是你关闭 TarInputStream 和 ByteArrayOutputStream 的问题吗?)。在确保一切正常后,我会检查进程打开了哪些文件。在 Linux 中,我将获取进程 ID 并执行“ls /proc/$processID/fd”,您可以看到该进程打开的所有文件。最后进入visualvm或其他分析器,看看打开线程的数量是怎么回事......如果它们随着时间的推移不断增加,你肯定会因为未关闭的连接而发生内存泄漏。如果在使用 Postgres 和“其他”时您的进程之间的线程数没有差异,那么也许您可以得出问题出在“其他”的结论。如果“其他人”在一次处理输入文件的大小时遇到问题,那么拆分输入文件的想法理论上可以解决他们的问题。
【讨论】:
【参考方案2】:发现问题。在通过 VisualVM 运行所有内容后,我注意到线程数保持不变,但仍有大量 JDBC4ResultSet 对象存在。我以为我要关闭所有这些,但后来我仔细观察并注意到了这一点:
//get auto-insert keys
val keys = stmt.getGeneratedKeys()
if (keys != null)
var keybuf = new ListBuffer[Any]();
while (keys.next())
keybuf += keys.getInt(1)
retval.generatedKeys = keybuf.toList
我没有意识到 stmt.getGeneratedKeys() 实际上返回了一个 ResultSet!将其更改为使用 Closable 包装器解决了该问题:
//get auto-insert keys
using(stmt.getGeneratedKeys()) keys =>
if (keys != null)
var keybuf = new ListBuffer[Any]();
while (keys.next())
keybuf += keys.getInt(1)
retval.generatedKeys = keybuf.toList
之前:
之后:
【讨论】:
以上是关于无法隔离 Scala 批量数据加载应用程序中的 JDBC 内存泄漏的主要内容,如果未能解决你的问题,请参考以下文章
注册函数时Databricks SCALA UDF无法加载类
如何从scala对象中的sbt ProjectPlugin.scala加载对象
scala的trait执行报错: 错误: 找不到或无法加载主类 cn.itcast.scala.`trait`
Azure blob 到 Azure SQL 数据库:无法批量加载,因为无法打开文件“xxxx.csv”。操作系统错误代码 5(访问被拒绝。)