scala slick mysql 字段过多 tuple问题

Posted 吾生也有涯,而知也无涯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala slick mysql 字段过多 tuple问题相关的知识,希望对你有一定的参考价值。

原同步服务正常,因需,对方单表新增字段,超过22条

sbt assembly 编译出错

too many elements for tuple: 26, allowed: 22

scala case class 最多只支持22个构造参数

遂找解决办法

https://underscore.io/blog/posts/2016/10/11/twenty-two.html

https://github.com/slick/slick/issues/519#issuecomment-48327043

https://github.com/underscoreio/slickless/issues/16

 

最终应用slickless解决

部分示例

import slick.jdbc.H2Profile.api._
import shapeless.{ HList, ::, HNil, Generic }
import slickless._

case class User(id: Long, email: String)

class Users(tag: Tag) extends Table[User](tag, "users") {
  def id    = column[Long]( "id", O.PrimaryKey, O.AutoInc )
  def email = column[String]("email")

  def * = (id :: email :: HNil).mappedWith(Generic[User])
}

lazy val users = TableQuery[Users]

 

 

基于 tupled 需要作转换

    case (BYOFFSET_GET,lastid:Int) => {
      println("sync start by",lastid)
      val words = TableQuery[Words]
      //      val action=ORIGIN_DB.run(Word.filter(_.id > lastid).take(PAGE_SIZE).result)
      val action=ORIGIN_DB.run(words.filter(_.id > lastid).take(PAGE_SIZE).result)
      action.onComplete(data=>{
        println("sync get data",data.get.length)
        if (data.isSuccess){
          val words=data.get.toList.map(a=>{
            Word.tupled(a)
          })
          if (words.length>0){
            Future {
              println(s"Blocking next page 1s start")
              TimeUnit.SECONDS.sleep(1)
              println(s"Blocking next page 1s finished")
              //同步时只考虑insert
              if(is_just_insert){
                self !(BYOFFSET_INSERT,words)
              }else{
                //如果会更新历史数据
                self !(BYOFFSET_INSERT_UPDATE,words)
              }
            }
          }else{
            //拿到最新数据 等待5分钟
            Future {
              println(s"Blocking future 5m start")
              TimeUnit.MINUTES.sleep(5)
              println(s"Blocking future 5m finished")
              self !(BYOFFSET_GET,lastid)
            }
          }
        }else{
      Future {
       println(s"Blocking table "+tablename+" future 5m start")
       TimeUnit.MINUTES.sleep(5)
       println(s"Blocking table "+tablename+" future 5m finished")
       self !(BYOFFSET_GET,lastid)
      }
}
      })
    }
    //插入数据
    case (BYOFFSET_INSERT,words:List[Word])=>{
      println("insert start",words.length)
      val word = TableQuery[Words]
      word.++=(words.map(a=>{
        Word.unapply(a).get
      }))
      //Word.+=(Word.unapply(words.head).get)
      val insertActions = DBIO.seq(
        word.++=(words.map(a=>{
          Word.unapply(a).get
        }))
      )
      DEST_DB.run(insertActions).onComplete(data=>{
       if (data.isSuccess){
        println("insert data result",data)
        //添加成功后更新last表
         val lastid=words.last.id
         Sync.lastActor !(BYOFFSET_UPSERT_OFFSET,tablename,lastid)
       }else{
          self !(BYOFFSET_INSERT,words)
        }
      })
    }

 

基于HLists/Generics 则不必

  case (BYOFFSET_GET,lastid:Int) => {
      println("table "+tablename+"  sync start by",lastid)
      val users = TableQuery[Users]
      //      val action=ORIGIN_DB.run(User.filter(_.id > lastid).take(PAGE_SIZE).result)
      val action=ORIGIN_DB.run(users.filter(_.id > lastid).take(PAGE_SIZE).result)
      action.onComplete(data=>{
        println("table "+tablename+"  sync get data",data.get.length)
        if (data.isSuccess){
          val users=data.get.toList
          if (users.length>0){
            Future {
              println(s"Blocking table "+tablename+"  next page 1s start")
              TimeUnit.SECONDS.sleep(1)
              println(s"Blocking table "+tablename+"  next page 1s finished")
              //同步时只考虑insert
              if(is_just_insert){
                self !(BYOFFSET_INSERT,users)
              }else{
                //如果会更新历史数据
                self !(BYOFFSET_INSERT_UPDATE,users)
              }
            }
          }else{
            Future {
              println(s"Blocking table "+tablename+"  future 5m start")
              TimeUnit.MINUTES.sleep(5)
              println(s"Blocking table "+tablename+"  future 5m finished")
              self !(BYOFFSET_GET,lastid)
            }
          }
        }else{
            Future {
              println(s"Blocking table "+tablename+"  future 5m start")
              TimeUnit.MINUTES.sleep(5)
              println(s"Blocking table "+tablename+"  future 5m finished")
              self !(BYOFFSET_GET,lastid)
            }
        }
      })
    }
    //  插入数据
    case (BYOFFSET_INSERT,users:List[User])=>{
      println("table "+tablename+"  insert start",users.length)
      val user = TableQuery[Users]
      //User.+=(User.unapply(users.head).get)
      val insertActions = DBIO.seq(
          user++=users
      )
      DEST_DB.run(insertActions).onComplete(data=>{
       if (data.isSuccess){
        println("table "+tablename+"  insert data result",data)
        //添加成功后更新last表
         val lastid=users.last.id
         Sync.lastActor !(BYOFFSET_UPSERT_OFFSET,tablename,lastid)
       }else{
          self !(BYOFFSET_INSERT,users)
        }            
      })
    }  

scala slick 异构表同步服务

https://github.com/cclient/ScalamysqlSync







以上是关于scala slick mysql 字段过多 tuple问题的主要内容,如果未能解决你的问题,请参考以下文章

scala slick mysql utf8mb4 支持

如何在 Scala Slick 中运行补丁/部分数据库更新?

Scala入门到精通——第二十九节 Scala数据库编程

为不存在的表 slick scala (Slick 3.0.0, scala) 创建一个类 Table

Scala / Slick,“等待连接 20000 毫秒后超时”错误

在 Scala 中使用 Slick 库获取自动增量值