将 SQL ResultSet 视为 Scala 流
Posted
技术标签:
【中文标题】将 SQL ResultSet 视为 Scala 流【英文标题】:Treating an SQL ResultSet like a Scala Stream 【发布时间】:2012-03-27 01:13:07 【问题描述】:当我查询数据库并接收到(只进、只读)ResultSet 时,ResultSet 就像数据库行列表一样。
我正在尝试找到某种方法来将此 ResultSet 视为 Scala Stream
。这将允许进行filter
、map
等操作,同时不会消耗大量内存。
我实现了一个尾递归方法来提取单个项目,但这要求所有项目同时在内存中,如果 ResultSet 非常大,就会出现问题:
// Iterate through the result set and gather all of the String values into a list
// then return that list
@tailrec
def loop(resultSet: ResultSet,
accumulator: List[String] = List()): List[String] =
if (!resultSet.next) accumulator.reverse
else
val value = resultSet.getString(1)
loop(resultSet, value +: accumulator)
【问题讨论】:
你能用 Iterable 而不是 Stream 来做你想做的事吗? 此外,流无论如何都会将值保留在内存中,因此当您到达列表末尾时,您实际上不会节省内存。 我认为如果没有 jdbc 标志/选项让 jdbc 本身流式传输结果,您仍然在内存中拥有一份完整的数据副本,由您的 jdbc api 构建。 【参考方案1】:我没测试过,为什么不行呢?
new Iterator[String]
def hasNext = resultSet.next()
def next() = resultSet.getString(1)
.toStream
【讨论】:
看起来很完美。我将在设置数据库后立即对其进行测试。我什至认为我不需要将其转换为Stream
。我可以直接向它申请map
、filter
等。
我想给你第二次投票。我已将此代码片段添加到我的 Scala sn-ps 库中。它正迅速成为我的最爱之一。
这是一个很酷的解决方案,但我担心。我认为Iterator
的通常合同是hasNext
是无副作用的。它可以在两次调用next
之间调用任意次数。有什么东西可以防止这成为一个问题吗?
在 mysql-connector-java
版本 6 中对我不起作用。不确定我是否做错了什么,但我的 ResultSet
在第二个 next()
呼叫中关闭,所以我只能检索一个结果行。在获得所有行之前它不会自动关闭的唯一方法似乎是使用while (rs.next()) ...
,因此我将项目单独添加到while
中的scala.collection.mutable.ListBuffer
。看起来不漂亮,但想不出任何其他方式。
@Nick 使用 new Iterator[String] ... .toList
而不是 .toStream
将立即获取整个结果集,而不仅仅是第一行。【参考方案2】:
@elbowich 回答的实用功能:
def results[T](resultSet: ResultSet)(f: ResultSet => T) =
new Iterator[T]
def hasNext = resultSet.next()
def next() = f(resultSet)
允许您使用类型推断。例如:
stmt.execute("SELECT mystr, myint FROM mytable")
// Example 1:
val it = results(stmt.resultSet)
case rs => rs.getString(1) -> 100 * rs.getInt(2)
val m = it.toMap // Map[String, Int]
// Example 2:
val it = results(stmt.resultSet)(_.getString(1))
【讨论】:
【参考方案3】:这听起来像是一个隐式类的好机会。首先在某处定义隐式类:
import java.sql.ResultSet
object Implicits
implicit class ResultSetStream(resultSet: ResultSet)
def toStream: Stream[ResultSet] =
new Iterator[ResultSet]
def hasNext = resultSet.next()
def next() = resultSet
.toStream
接下来,只需在执行查询并定义 ResultSet 对象的任何位置导入这个隐式类:
import com.company.Implicits._
最后使用 toStream 方法取出数据。比如获取所有的id如下图:
val allIds = resultSet.toStream.map(result => result.getInt("id"))
【讨论】:
你确定它有效吗?它在 DB2 上失败,ResultSet 被关闭。如果这适用于您的情况,可能取决于特定的数据库品牌和/或配置? 可以,但只要您的连接保持打开状态,您就只能使用该流。如果您关闭连接,则流将失败,迭代器也会失败。【参考方案4】:我需要类似的东西。在肘部非常酷的答案的基础上,我将它包装了一下,而不是字符串,我返回结果(这样你就可以得到任何列)
def resultSetItr(resultSet: ResultSet): Stream[ResultSet] =
new Iterator[ResultSet]
def hasNext = resultSet.next()
def next() = resultSet
.toStream
我需要访问表元数据,但这适用于表行(可以执行 stmt.executeQuery(sql) 而不是 md.getColumns):
val md = connection.getMetaData()
val columnItr = resultSetItr( md.getColumns(null, null, "MyTable", null))
val columns = columnItr.map(col =>
val columnType = col.getString("TYPE_NAME")
val columnName = col.getString("COLUMN_NAME")
val columnSize = col.getString("COLUMN_SIZE")
new Column(columnName, columnType, columnSize.toInt, false)
)
【讨论】:
如果您不需要返回流(例如,仅向前迭代),您可以使用迭代器。这大大减少了使用流的内存开销(返回Iterator[ResultSet]
,并删除toStream
)【参考方案5】:
因为 ResultSet 只是一个被 next 导航的可变对象,我们需要定义我们自己的下一行的概念。我们可以使用如下的输入函数来做到这一点:
class ResultSetIterator[T](rs: ResultSet, nextRowFunc: ResultSet => T)
extends Iterator[T]
private var nextVal: Option[T] = None
override def hasNext: Boolean =
val ret = rs.next()
if(ret)
nextVal = Some(nextRowFunc(rs))
else
nextVal = None
ret
override def next(): T = nextVal.getOrElse
hasNext
nextVal.getOrElse( throw new ResultSetIteratorOutOfBoundsException
)
class ResultSetIteratorOutOfBoundsException extends Exception("ResultSetIterator reached end of list and next can no longer be called. hasNext should return false.")
编辑: 转换为流或其他内容。
【讨论】:
【参考方案6】:Iterator.continually(rs.next())
.takeWhile(identity)
.map(_ => Model(
id = rs.getInt("id"),
text = rs.getString("text")
))
【讨论】:
【参考方案7】:这里有一个替代方案,类似于 Sergey Alaev 和 thoredge 的解决方案,当我们需要一个符合 Iterator
合同的解决方案时,hasNext
没有副作用。
假设一个函数f: ResultSet => T
:
Iterator.unfold(resultSet.next()) hasNext =>
Option.when(hasNext)(f(resultSet), resultSet.next())
我发现在ResultSet
上使用map
“扩展方法”很有用。
implicit class ResultSetOps(resultSet: ResultSet)
def map[T](f: ResultSet => T): Iterator[T] =
Iterator.unfold(resultSet.next()) hasNext =>
Option.when(hasNext)(f(resultSet), resultSet.next())
【讨论】:
【参考方案8】:这个实现虽然更长更笨拙,但它更符合 ResultSet 契约。副作用已从 hasNext(...) 中移除并移至 next()。
new Iterator[String]
private var available = resultSet.next()
override def hasNext: Boolean = available
override def next(): String =
val string = resultSet.getString(1)
available = resultSet.next()
string
【讨论】:
【参考方案9】:我认为上述大多数实现都有一个不确定的hasNext
方法。调用它两次会将光标移动到第二行。我建议使用类似的东西:
new Iterator[ResultSet]
def hasNext =
!resultSet.isLast
def next() =
resultSet.next()
resultSet
【讨论】:
【参考方案10】:上面的另一个变体,适用于 Scala 2.12:
implicit class ResultSetOps(resultSet: ResultSet)
def map[T](f: ResultSet => T): Iterator[T] =
Iterator.continually(resultSet).takeWhile(_.next()).map(f)
【讨论】:
以上是关于将 SQL ResultSet 视为 Scala 流的主要内容,如果未能解决你的问题,请参考以下文章
Scala:通过生成器(可迭代)公开 JDBC ResultSet