SDP:ScalikeJDBC- JDBC-Engine:Fetching

Posted 雪川大虫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SDP:ScalikeJDBC- JDBC-Engine:Fetching相关的知识,希望对你有一定的参考价值。

  ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为一种JDBC-Engine还是比较理想的:让它处于各种JDBC工具库和数据库实例之间接收JDBC运算指令然后连接目标数据库进行相关运算后返回结果。一般来说,各种JDBC工具库如ORM,FRM软件通过各自的DSL在复杂的数据库表关系环境内进行数据管理编程,最终产生相关的SQL语句即(prepared)statement+parameters传递给指定类型的数据库JDBC驱动程序去运算并产生结果。如果这样描述,那么JDBC-Engine主要的功能就是支持下面这个函数:

jdbcRunSQL(context: JDBCContext): JDBCResultSet

这个函数的用户提供一个JDBCContext类型值,然后由jdbcRunSQL进行接下来的运算并返回结果。从这个角度分析,JDBCContext最起码需要提供下面的属性: 

1、数据库连接:选择数据库连接池

2、运算参数:fetchSize, queryTimeout,queryTag。这几个参数都针对当前运算的SQL

3、Query参数:

    Query类型:select/execute/update、单条/成批、前置/后置query、generateKey

    SQL语句:statement:Seq[String]、parameters: Seq[Option[Seq[Any]]]

下面就是JDBCContext类型定义

import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXECUTE = 1
    val SQL_UPDATE = 2

    def returnColumnByIndex(idx: Int) = Some(idx)

    def returnColumnByName(col: String) = Some(col)
  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String],
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Option[Any] = None,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None)
重新考虑了一下,觉着把jdbc读写分开两个函数来实现更容易使用,因为这样比较符合编程模式和习性。所以最好把sqlType=SQL_SELECT类型SQL独立一个函数出来运算:
   def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
      ctx.sqlType match {
        case SQL_SELECT => {
          val params: Seq[Any] = ctx.parameters match {
            case Nil => Nil
            case [email protected]_ => p.head
          }
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          ctx.queryTags.foreach(rawSql.tags(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
          sql.collection.apply[C]()
        }
        case _ => throw new IllegalStateException("sqlType must be ‘SQL_SELECT‘!")
      }
    }

还需要提供noExtractor函数来符合SQLToCollectionImpl类型的参数款式要求:

  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }
我们来测试用一下jdbcQueryResult:
import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
  ConfigDBsWithEnv("dev").setupAll()

  val ctx = JDBCContext(
    dbName = h2,
    statements = Seq("select * from members where id = ?"),
    parameters = Seq(Seq(2))
  )

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  println(s"members in vector: $vecMember")

  val ctx1 = ctx.copy(dbName = mysql)

  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})

  println(s"selected name: $names")

  val ctx2 = ctx1.copy(dbName = postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})

  println(s"selected id+name: $idname")
}

如果我们使用Slick-DSL进行数据库管理编程后应该如何与JDBC-Engine对接:

 object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._


  val slickCtx = JDBCContext(
    dbName = h2,
    statements = Seq(statement),
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
    rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))

输出正确。

下面就是本次示范的源代码:

 build.sbt

name := "learn-scalikeJDBC"

version := "0.1"

scalaVersion := "2.12.4"

// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0",
  "com.h2database"  %  "h2"                % "1.4.196",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "org.postgresql" % "postgresql" % "42.2.0",
  "commons-dbcp" % "commons-dbcp" % "1.4",
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.2.1",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

resources/application.conf 包括H2,MySQL,PostgreSQL

# JDBC settings
test {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "commons-dbcp2"
    }
  }

  db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  db.mysql.user = "root"
  db.mysql.password = "123"
  db.mysql.poolInitialSize = 5
  db.mysql.poolMaxSize = 7
  db.mysql.poolConnectionTimeoutMillis = 1000
  db.mysql.poolValidationQuery = "select 1 as one"
  db.mysql.poolFactoryName = "bonecp"

  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
    mysql {
      driver = "com.mysql.cj.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/testdb"
      user = "root"
      password = "123"
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "bonecp"

    }
    postgres {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost:5432/testdb"
      user = "root"
      password = "123"
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
  }
  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10

HikariConfig.scala  HikariCP连接池实现

package configdbs
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository

/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
  import scala.collection.JavaConverters._

  def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
  def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
  def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
  def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default

  def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
  def getDurationOr(path: String, default: => Duration = Duration.Zero) =
    if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default

  def getPropertiesOr(path: String, default: => Properties = null): Properties =
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default

  def toProperties: Properties = {
    def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
      val props = new Properties(null)
      m.foreach { case (k, cv) =>
        val v =
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
          else if(cv.unwrapped eq null) null
          else cv.unwrapped.toString
        if(v ne null) props.put(k, v)
      }
      props
    }
    toProps(c.root.asScala)
  }

  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
  def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
  def getStringOpt(path: String) = Option(getStringOr(path))
  def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
}

object ConfigExtensionMethods {
  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
}

trait HikariConfigReader extends TypesafeConfigReader {
  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>

  import ConfigExtensionMethods.configExtensionMethods

  def getFactoryName(dbName: Symbol): String = {
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
  }

  def hikariCPConfig(dbName: Symbol): HikariConfig = {

    val hconf = new HikariConfig()
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)

    // Connection settings
    if (c.hasPath("dataSourceClass")) {
      hconf.setDataSourceClassName(c.getString("dataSourceClass"))
    } else {
      Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
    }
    hconf.setJdbcUrl(c.getStringOr("url", null))
    c.getStringOpt("user").foreach(hconf.setUsername)
    c.getStringOpt("password").foreach(hconf.setPassword)
    c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)

    // Pool configuration
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
    c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
    c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
    val numThreads = c.getIntOr("numThreads", 20)
    hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
    hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
    hconf.setPoolName(c.getStringOr("poolName", dbName.name))
    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))

    // Equivalent of ConnectionPreparer
    hconf.setReadOnly(c.getBooleanOr("readOnly", false))
    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
    hconf.setCatalog(c.getStringOr("catalog", null))

    hconf

  }
}

import scalikejdbc._
trait ConfigDBs {
  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>

  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    getFactoryName(dbName) match {
      case "hikaricp" => {
        val hconf = hikariCPConfig(dbName)
        val hikariCPSource = new HikariDataSource(hconf)
        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
          Class.forName(hconf.getDriverClassName)
        }
        ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
      }
      case _ => {
        val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
        val cpSettings = readConnectionPoolSettings(dbName)
        if (driver != null && driver.trim.nonEmpty) {
          Class.forName(driver)
        }
        ConnectionPool.add(dbName, url, user, password, cpSettings)
      }
    }
  }

  def setupAll(): Unit = {
    loadGlobalSettings()
    dbNames.foreach { dbName => setup(Symbol(dbName)) }
  }

  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    ConnectionPool.close(dbName)
  }

  def closeAll(): Unit = {
    ConnectionPool.closeAll
  }

}


object ConfigDBs extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader

case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader
  with EnvPrefix {

  override val env = Option(envValue)
}

JDBCEngine.scala jdbcQueryResult函数实现

import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXECUTE = 1
    val SQL_UPDATE = 2

    def returnColumnByIndex(idx: Int) = Some(idx)

    def returnColumnByName(col: String) = Some(col)
  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String],
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Option[Any] = None,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None)

  object JDBCEngine {

    import JDBCContext._

    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }

    def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
      ctx.sqlType match {
        case SQL_SELECT => {
          val params: Seq[Any] = ctx.parameters match {
            case Nil => Nil
            case [email protected]_ => p.head
          }
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!"))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          ctx.queryTags.foreach(rawSql.tags(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
          sql.collection.apply[C]()
        }
        case _ => throw new IllegalStateException("sqlType must be ‘SQL_SELECT‘!")
      }
    }

  }

JDBCQueryDemo.scala  功能测试代码

import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
  ConfigDBsWithEnv("dev").setupAll()

  val ctx = JDBCContext(
    dbName = h2,
    statements = Seq("select * from members where id = ?"),
    parameters = Seq(Seq(2))
  )

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  println(s"members in vector: $vecMember")

  val ctx1 = ctx.copy(dbName = mysql)

  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})

  println(s"selected name: $names")

  val ctx2 = ctx1.copy(dbName = postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})

  println(s"selected id+name: $idname")


  object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._


  val slickCtx = JDBCContext(
    dbName = h2,
    statements = Seq(statement),
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
    rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))


}

 

 

 

 

 

 

 

 

 

 

 

 


以上是关于SDP:ScalikeJDBC- JDBC-Engine:Fetching的主要内容,如果未能解决你的问题,请参考以下文章

SDP:ScalikeJDBC- JDBC-Engine:Streaming

ScalaScala使用scalikejdbc工具连接MySQL(推荐)

更新ScalikeJDBC中的返回查询

scalikejdbc 学习笔记

scalikejdbc 学习笔记

scalikejdbc 学习笔记