SDP:ScalikeJDBC- JDBC-Engine:Streaming

Posted 雪川大虫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SDP:ScalikeJDBC- JDBC-Engine:Streaming相关的知识,希望对你有一定的参考价值。

  作为一种通用的数据库编程引擎,用Streaming来应对海量数据的处理是必备功能。同样,我们还是通过一种Context传递产生流的要求。因为StreamingContext比较简单,而且还涉及到数据抽取函数extractor的传递,所以我们分开来定义:

case class JDBCQueryContext[M](
                        dbName: Symbol,
                        statement: String,
                        parameters: Seq[Any] = Nil,
                        fetchSize: Int = 100,
                        autoCommit: Boolean = false,
                        queryTimeout: Option[Int] = None,
                        extractor: WrappedResultSet => M)

由于我们会将JDBCQueryContext传给JDBC-Engine去运算,所以Streaming函数的所有参数都必须明确定义,包括extractor函数。实际上JDBCQueryContext也完全满足了jdbcQueryResult函数。我们会在后面重新设计这个函数。JDBCStreaming函数产生一个akka-Source,如下:

def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
                         (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {

      val publisher: DatabasePublisher[A] = NamedDB(h2) readOnlyStream {
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
        ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
        val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)

        sql.iterator
          .withDBSessionForceAdjuster(session => {
            session.connection.setAutoCommit(ctx.autoCommit)
            session.fetchSize(ctx.fetchSize)
          })
       }
      Source.fromPublisher[A](publisher)
    }

 我们只需要提供一个Sink就可以使用这个akka-stream了:

import akka.actor._
import akka.stream.scaladsl._
import akka.stream._
import scalikejdbc._
import configdbs._
import jdbccontext._
import JDBCEngine._

object JDBCStreaming extends App {

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  ConfigDBsWithEnv("dev").setup(h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings()


  case class DataRow(year: String, state: String, county: String, value: String)

  //data row converter
  val toRow = (rs: WrappedResultSet) => DataRow(
    year = rs.string("REPORTYEAR"),
    state = rs.string("STATENAME"),
    county = rs.string("COUNTYNAME"),
    value = rs.string("VALUE")
  )

  //construct the context
  val ctx = JDBCQueryContext[DataRow](
    dbName = h2,
    statement = "select * from AIRQM",
    extractor = toRow
  )

  //pass context to construct akka-source
  val akkaSource = jdbcAkkaStream(ctx)
  //a sink for display rows
  val snk = Sink.foreach[(DataRow,Long)] { case (row,idx) =>
    println(s"rec#: $idx - year: ${row.year} location: ${row.state},${row.county} value: ${row.value}")}
  //can manual terminate stream by kill.shutdown
  val kill: UniqueKillSwitch = (akkaSource.zipWithIndex).viaMat(KillSwitches.single)(Keep.right).to(snk).run


  scala.io.StdIn.readLine()
  kill.shutdown()
  actorSys.terminate()
  println("+++++++++++++++")

}

试运行结果OK。下面是新版本的jdbcQueryResult函数:

    def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCQueryContext[A])(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {

          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
          sql.collection.apply[C]()

    }

试运行:

 object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._

  def toRow: WrappedResultSet => CountyModel = rs =>
     CountyModel(id=rs.int("id"),name=rs.string("name"))
  //construct the context
  val slickCtx = JDBCQueryContext[CountyModel](
    dbName = h2,
    statement = "select * from county where id > ? and id < ?",
    parameters = Seq(6000,6200),
    extractor = toRow
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx)
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))

下面是本次讨论的示范源代码:

build.sbt

// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
  "com.h2database"  %  "h2"                % "1.4.196",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "org.postgresql" % "postgresql" % "42.2.0",
  "commons-dbcp" % "commons-dbcp" % "1.4",
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.2.1",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
  "com.typesafe.akka" %% "akka-actor" % "2.5.4",
  "com.typesafe.akka" %% "akka-stream" % "2.5.4"
)

resources/application.conf

# JDBC settings
test {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "commons-dbcp2"
    }
  }

  db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  db.mysql.user = "root"
  db.mysql.password = "123"
  db.mysql.poolInitialSize = 5
  db.mysql.poolMaxSize = 7
  db.mysql.poolConnectionTimeoutMillis = 1000
  db.mysql.poolValidationQuery = "select 1 as one"
  db.mysql.poolFactoryName = "bonecp"

  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
    mysql {
      driver = "com.mysql.cj.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/testdb"
      user = "root"
      password = "123"
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "bonecp"

    }
    postgres {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost:5432/testdb"
      user = "root"
      password = "123"
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
  }
  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}

JDBCEngine.scala

package jdbccontext
import java.sql.PreparedStatement

import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

import scala.util._
import scalikejdbc.TxBoundary.Try._

import scala.concurrent.ExecutionContextExecutor

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXEDDL= 1
    val SQL_UPDATE = 2
    val RETURN_GENERATED_KEYVALUE = true
    val RETURN_UPDATED_COUNT = false

  }

case class JDBCQueryContext[M](
                        dbName: Symbol,
                        statement: String,
                        parameters: Seq[Any] = Nil,
                        fetchSize: Int = 100,
                        autoCommit: Boolean = false,
                        queryTimeout: Option[Int] = None,
                        extractor: WrappedResultSet => M)


case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String] = Nil,
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Seq[Option[Any]] = Nil,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None) {

    ctx =>

    //helper functions

    def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)

    def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)

    def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)

    def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)

    def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1)
        ctx.copy(preAction = action)
      else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
    }

    def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1)
        ctx.copy(postAction = action)
      else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
    }

    def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
        ctx.copy(
          statements = ctx.statements ++ Seq(_statement),
          parameters = ctx.parameters ++ Seq(Seq(_parameters))
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!")
    }

    def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
        ctx.copy(
          statements = ctx.statements ++ Seq(_statement),
          parameters = ctx.parameters ++ Seq(_parameters),
          returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!")
    }

    def appendBatchParameters(_parameters: Any*): JDBCContext = {
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
        throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")

      var matchParams = true
      if (ctx.parameters != Nil)
        if (ctx.parameters.head.size != _parameters.size)
          matchParams = false
      if (matchParams) {
        ctx.copy(
          parameters = ctx.parameters ++ Seq(_parameters)
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
    }

    def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
         throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
      ctx.copy(
        returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
      )
    }

    def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          sqlType = JDBCContext.SQL_EXEDDL,
          batch = false
        )
      }

      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = false
        )
      }
      def setBatchCommand(_statement: String): JDBCContext = {
        ctx.copy (
          statements = Seq(_statement),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = true
        )
      }
  }

  object JDBCEngine {

    import JDBCContext._

    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }

    def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
                         (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {

      val publisher: DatabasePublisher[A] = NamedDB(h2) readOnlyStream {
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
        ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
        val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)

        sql.iterator
          .withDBSessionForceAdjuster(session => {
            session.connection.setAutoCommit(ctx.autoCommit)
            session.fetchSize(ctx.fetchSize)
          })
       }
      Source.fromPublisher[A](publisher)
    }


    def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCQueryContext[A])(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {

          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
          sql.collection.apply[C]()

    }

    def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
       if (ctx.sqlType != SQL_EXEDDL) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be ‘SQL_EXEDDL‘!"))
      }
      else {
        NamedDB(ctx.dbName) localTx { implicit session =>
          Try {
                ctx.statements.foreach { stm =>
                  val ddl = new SQLExecution(statement = stm, parameters = Nil)(
                    before = WrappedResultSet => {})(
                    after = WrappedResultSet => {})

                  ddl.apply()
              }
            "SQL_EXEDDL executed succesfully."
          }
        }
      }
    }

    def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (ctx.statements == Nil)
        throw new IllegalStateException("JDBCContex setting error: statements empty!")
      if (ctx.sqlType != SQL_UPDATE) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be ‘SQL_UPDATE‘!"))
      }
      else {
        if (ctx.batch) {
          if (noReturnKey(ctx)) {
            val usql = SQL(ctx.statements.head)
              .tags(ctx.queryTags: _*)
              .batch(ctx.parameters: _*)
            Try {
              NamedDB(ctx.dbName) localTx { implicit session =>
                ctx.queryTimeout.foreach(session.queryTimeout(_))
                usql.apply[Seq]()
                Seq.empty[Long].to[C]
              }
            }
          } else {
            val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
            Try {
              NamedDB(ctx.dbName) localTx { implicit session =>
                ctx.queryTimeout.foreach(session.queryTimeout(_))
                usql.apply[C]()
              }
            }
          }

        } else {
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
        }
      }
    }
     private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
       implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
       val Some(key) :: xs = ctx.returnGeneratedKey
       val params: Seq[Any] = ctx.parameters match {
         case Nil => Nil
         case [email protected]_ => p.head
       }
       val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
       Try {
         NamedDB(ctx.dbName) localTx { implicit session =>
           session.fetchSize(ctx.fetchSize)
           ctx.queryTimeout.foreach(session.queryTimeout(_))
           val result = usql.apply()
           Seq(result).to[C]
         }
       }
     }

      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      val params: Seq[Any] = ctx.parameters match {
        case Nil => Nil
        case [email protected]_ => p.head
      }
      val before = ctx.preAction match {
        case None => pstm: PreparedStatement => {}
        case Some(f) => f
      }
      val after = ctx.postAction match {
        case None => pstm: PreparedStatement => {}
        case Some(f) => f
      }
      val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
      Try {
        NamedDB(ctx.dbName) localTx {implicit session =>
          session.fetchSize(ctx.fetchSize)
          ctx.queryTimeout.foreach(session.queryTimeout(_))
          val result = usql.apply()
          Seq(result.toLong).to[C]
        }
      }

    }

    private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (noReturnKey(ctx))
        singleTxUpdateNoReturnKey(ctx)
      else
        singleTxUpdateWithReturnKey(ctx)
    }

    private def noReturnKey(ctx: JDBCContext): Boolean = {
      if (ctx.returnGeneratedKey != Nil) {
        val k :: xs = ctx.returnGeneratedKey
         k match {
          case None => true
          case Some(k) => false
        }
      } else true
    }

    def noActon: PreparedStatement=>Unit = pstm => {}

    def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        Try {
          NamedDB(ctx.dbName) localTx { implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
              case Nil => Seq.fill(ctx.statements.size)(None)
              case [email protected]_ => k
            }
            val sqlcmd = ctx.statements zip ctx.parameters zip keys
            val results = sqlcmd.map { case ((stm, param), key) =>
              key match {
                case None =>
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
                case Some(k) =>
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
              }
            }
            results.to[C]
          }
        }
     }


    def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (ctx.statements == Nil)
        throw new IllegalStateException("JDBCContex setting error: statements empty!")
      if (ctx.sqlType != SQL_UPDATE) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be ‘SQL_UPDATE‘!"))
      }
      else {
        if (!ctx.batch) {
          if (ctx.statements.size == 1)
            singleTxUpdate(ctx)
          else
            multiTxUpdates(ctx)
        } else
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))

      }
    }

  }

JDBCQueryDemo.scala

import akka.actor._
import akka.stream.scaladsl._
import akka.stream._
import scalikejdbc._
import configdbs._
import jdbccontext._
import JDBCEngine._

object JDBCStreaming extends App {

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  ConfigDBsWithEnv("dev").setup(h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings()


  case class DataRow(year: String, state: String, county: String, value: String)

  //data row converter
  val toRow = (rs: WrappedResultSet) => DataRow(
    year = rs.string("REPORTYEAR"),
    state = rs.string("STATENAME"),
    county = rs.string("COUNTYNAME"),
    value = rs.string("VALUE")
  )

  //construct the context
  val ctx = JDBCQueryContext[DataRow](
    dbName = h2,
    statement = "select * from AIRQM",
    extractor = toRow
  )

  //pass context to construct akka-source
  val akkaSource = jdbcAkkaStream(ctx)
  //a sink for display rows
  val snk = Sink.foreach[(DataRow,Long)] { case (row,idx) =>
    println(s"rec#: $idx - year: ${row.year} location: ${row.state},${row.county} value: ${row.value}")}
  //can manual terminate stream by kill.shutdown
  val kill: UniqueKillSwitch = (akkaSource.zipWithIndex).viaMat(KillSwitches.single)(Keep.right).to(snk).run


  scala.io.StdIn.readLine()
  kill.shutdown()
  actorSys.terminate()
  println("+++++++++++++++")

  object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._

  def toCounty: WrappedResultSet => CountyModel = rs =>
    CountyModel(id=rs.int("id"),name=rs.string("name"))
  //construct the context
  val slickCtx = JDBCQueryContext[CountyModel](
    dbName = h2,
    statement = "select * from county where id > ? and id < ?",
    parameters = Seq(6000,6200),
    extractor = toCounty
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx)
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))


}

 

以上是关于SDP:ScalikeJDBC- JDBC-Engine:Streaming的主要内容,如果未能解决你的问题,请参考以下文章

SDP:ScalikeJDBC- JDBC-Engine:Streaming

ScalaScala使用scalikejdbc工具连接MySQL(推荐)

更新ScalikeJDBC中的返回查询

scalikejdbc 学习笔记

scalikejdbc 学习笔记

scalikejdbc 学习笔记