将 SQL ResultSet 视为 Scala 流

Posted

技术标签:

【中文标题】将 SQL ResultSet 视为 Scala 流【英文标题】:Treating an SQL ResultSet like a Scala Stream 【发布时间】:2012-03-27 01:13:07 【问题描述】:

当我查询数据库并接收到(只进、只读)ResultSet 时,ResultSet 就像数据库行列表一样。

我正在尝试找到某种方法来将此 ResultSet 视为 Scala Stream。这将允许进行filtermap 等操作,同时不会消耗大量内存。

我实现了一个尾递归方法来提取单个项目,但这要求所有项目同时在内存中,如果 ResultSet 非常大,就会出现问题:

// Iterate through the result set and gather all of the String values into a list
// then return that list
@tailrec
def loop(resultSet: ResultSet,
         accumulator: List[String] = List()): List[String] = 
  if (!resultSet.next) accumulator.reverse
  else 
    val value = resultSet.getString(1)
    loop(resultSet, value +: accumulator)
  

【问题讨论】:

你能用 Iterable 而不是 Stream 来做你想做的事吗? 此外,流无论如何都会将值保留在内存中,因此当您到达列表末尾时,您实际上不会节省内存。 我认为如果没有 jdbc 标志/选项让 jdbc 本身流式传输结果,您仍然在内存中拥有一份完整的数据副本,由您的 jdbc api 构建。 【参考方案1】:

我没测试过,为什么不行呢?

new Iterator[String] 
  def hasNext = resultSet.next()
  def next() = resultSet.getString(1)
.toStream

【讨论】:

看起来很完美。我将在设置数据库后立即对其进行测试。我什至认为我不需要将其转换为Stream。我可以直接向它申请mapfilter等。 我想给你第二次投票。我已将此代码片段添加到我的 Scala sn-ps 库中。它正迅速成为我的最爱之一。 这是一个很酷的解决方案,但我担心。我认为Iterator 的通常合同是hasNext 是无副作用的。它可以在两次调用next 之间调用任意次数。有什么东西可以防止这成为一个问题吗? mysql-connector-java 版本 6 中对我不起作用。不确定我是否做错了什么,但我的 ResultSet 在第二个 next() 呼叫中关闭,所以我只能检索一个结果行。在获得所有行之前它不会自动关闭的唯一方法似乎是使用while (rs.next()) ...,因此我将项目单独添加到while 中的scala.collection.mutable.ListBuffer。看起来不漂亮,但想不出任何其他方式。 @Nick 使用 new Iterator[String] ... .toList 而不是 .toStream 将立即获取整个结果集,而不仅仅是第一行。【参考方案2】:

@elbowich 回答的实用功能:

def results[T](resultSet: ResultSet)(f: ResultSet => T) = 
  new Iterator[T] 
    def hasNext = resultSet.next()
    def next() = f(resultSet)
  

允许您使用类型推断。例如:

stmt.execute("SELECT mystr, myint FROM mytable")

// Example 1:
val it = results(stmt.resultSet) 
  case rs => rs.getString(1) -> 100 * rs.getInt(2)

val m = it.toMap // Map[String, Int]

// Example 2:
val it = results(stmt.resultSet)(_.getString(1))

【讨论】:

【参考方案3】:

这听起来像是一个隐式类的好机会。首先在某处定义隐式类:

import java.sql.ResultSet

object Implicits 

    implicit class ResultSetStream(resultSet: ResultSet) 

        def toStream: Stream[ResultSet] = 
            new Iterator[ResultSet] 
                def hasNext = resultSet.next()

                def next() = resultSet
            .toStream
        
    

接下来,只需在执行查询并定义 ResultSet 对象的任何位置导入这个隐式类:

import com.company.Implicits._

最后使用 toStream 方法取出数据。比如获取所有的id如下图:

val allIds = resultSet.toStream.map(result => result.getInt("id"))

【讨论】:

你确定它有效吗?它在 DB2 上失败,ResultSet 被关闭。如果这适用于您的情况,可能取决于特定的数据库品牌和/或配置? 可以,但只要您的连接保持打开状态,您就只能使用该流。如果您关闭连接,则流将失败,迭代器也会失败。【参考方案4】:

我需要类似的东西。在肘部非常酷的答案的基础上,我将它包装了一下,而不是字符串,我返回结果(这样你就可以得到任何列)

def resultSetItr(resultSet: ResultSet): Stream[ResultSet] = 
    new Iterator[ResultSet] 
      def hasNext = resultSet.next()
      def next() = resultSet
    .toStream
  

我需要访问表元数据,但这适用于表行(可以执行 stmt.executeQuery(sql) 而不是 md.getColumns):

 val md = connection.getMetaData()
 val columnItr = resultSetItr( md.getColumns(null, null, "MyTable", null))
      val columns = columnItr.map(col => 
        val columnType = col.getString("TYPE_NAME")
        val columnName = col.getString("COLUMN_NAME")
        val columnSize = col.getString("COLUMN_SIZE")
        new Column(columnName, columnType, columnSize.toInt, false)
      )

【讨论】:

如果您不需要返回流(例如,仅向前迭代),您可以使用迭代器。这大大减少了使用流的内存开销(返回Iterator[ResultSet],并删除toStream【参考方案5】:

因为 ResultSet 只是一个被 next 导航的可变对象,我们需要定义我们自己的下一行的概念。我们可以使用如下的输入函数来做到这一点:

class ResultSetIterator[T](rs: ResultSet, nextRowFunc: ResultSet => T) 
extends Iterator[T] 

  private var nextVal: Option[T] = None

  override def hasNext: Boolean = 
    val ret = rs.next()
    if(ret) 
      nextVal = Some(nextRowFunc(rs))
     else 
      nextVal = None
    
    ret
  

  override def next(): T = nextVal.getOrElse  
    hasNext 
    nextVal.getOrElse( throw new ResultSetIteratorOutOfBoundsException 
  )

  class ResultSetIteratorOutOfBoundsException extends Exception("ResultSetIterator reached end of list and next can no longer be called. hasNext should return false.")

编辑: 转换为流或其他内容。

【讨论】:

【参考方案6】:
Iterator.continually(rs.next())
  .takeWhile(identity)
  .map(_ => Model(
      id = rs.getInt("id"),
      text = rs.getString("text")
   ))

【讨论】:

【参考方案7】:

这里有一个替代方案,类似于 Sergey Alaev 和 thoredge 的解决方案,当我们需要一个符合 Iterator 合同的解决方案时,hasNext 没有副作用。

假设一个函数f: ResultSet => T

Iterator.unfold(resultSet.next())  hasNext =>
  Option.when(hasNext)(f(resultSet), resultSet.next())

我发现在ResultSet 上使用map“扩展方法”很有用。

implicit class ResultSetOps(resultSet: ResultSet) 
    def map[T](f: ResultSet => T): Iterator[T] = 
      Iterator.unfold(resultSet.next())  hasNext =>
        Option.when(hasNext)(f(resultSet), resultSet.next())
      
    
  

【讨论】:

【参考方案8】:

这个实现虽然更长更笨拙,但它更符合 ResultSet 契约。副作用已从 hasNext(...) 中移除并移至 next()。

new Iterator[String] 
  private var available = resultSet.next()
  override def hasNext: Boolean = available
  override def next(): String = 
    val string = resultSet.getString(1)
    available = resultSet.next()
    string
  

【讨论】:

【参考方案9】:

我认为上述大多数实现都有一个不确定的hasNext 方法。调用它两次会将光标移动到第二行。我建议使用类似的东西:

  new Iterator[ResultSet] 
    def hasNext = 
      !resultSet.isLast
    
    def next() = 
      resultSet.next()
      resultSet
    
  

【讨论】:

【参考方案10】:

上面的另一个变体,适用于 Scala 2.12:

implicit class ResultSetOps(resultSet: ResultSet) 
 def map[T](f: ResultSet => T): Iterator[T] =
  Iterator.continually(resultSet).takeWhile(_.next()).map(f)

【讨论】:

以上是关于将 SQL ResultSet 视为 Scala 流的主要内容,如果未能解决你的问题,请参考以下文章

Scala:通过生成器(可迭代)公开 JDBC ResultSet

获取MySQL的JDBC连接对象(Scala版本)

ResultSet#getDate() 语义

一次性获取`java.sql.ResultSet`行中的所有值

如何将 ResultSet 转换为有用的整数?

JDBC中的ResultSet