Slick 3.0 批量插入或更新(更新插入)

Posted

技术标签:

【中文标题】Slick 3.0 批量插入或更新(更新插入)【英文标题】:Slick 3.0 bulk insert or update (upsert) 【发布时间】:2016-05-02 06:53:42 【问题描述】:

在 Slick 3.0 中进行批量 insertOrUpdate 的正确方法是什么?

我正在使用 mysql 来进行适当的查询

INSERT INTO table (a,b,c) VALUES (1,2,3),(4,5,6)
ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);

MySQL bulk INSERT or UPDATE

这是我当前的代码,非常慢:-(

// FIXME -- this is slow but will stop repeats, an insertOrUpdate
// functions for a list would be much better
val rowsInserted = rows.map 
  row => await(run(TableQuery[FooTable].insertOrUpdate(row)))
.sum

我正在寻找的是相当于

def insertOrUpdate(values: Iterable[U]): DriverAction[MultiInsertResult, NoStream, Effect.Write]

【问题讨论】:

【参考方案1】:

有几种方法可以使这段代码更快(每一种方法应该都比前面的方法快,但它会逐渐减少惯用的流畅度):

如果在 slick-pg 0.16.1+ 上运行 insertOrUpdateAll 而不是 insertOrUpdate

await(run(TableQuery[FooTable].insertOrUpdateAll rows)).sum

一次性运行所有 DBIO 事件,而不是等待每个事件都提交后再运行下一个事件:

val toBeInserted = rows.map  row => TableQuery[FooTable].insertOrUpdate(row) 
val inOneGo = DBIO.sequence(toBeInserted)
val dbioFuture = run(inOneGo)
// Optionally, you can add a `.transactionally`
// and / or `.withPinnedSession` here to pin all of these upserts
// to the same transaction / connection
// which *may* get you a little more speed:
// val dbioFuture = run(inOneGo.transactionally)
val rowsInserted = await(dbioFuture).sum

下拉到 JDBC 级别并一次性运行您的 upsert (idea via this answer):

val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);"""

SimpleDBIO[List[Int]]  session =>
  val statement = session.connection.prepareStatement(SQL)
  rows.map  row =>
    statement.setInt(1, row.a)
    statement.setInt(2, row.b)
    statement.setInt(3, row.c)
    statement.addBatch()
  
  statement.executeBatch()

【讨论】:

酷。特别感谢您提供的第二种技术。我不知道这个 只是为了仔细检查:第一个解决方案不是批量插入,是吗?看起来它是在并行机器人中进行所有插入而不是批处理,不是吗? 在第一个解决方案中,我是否将往返行程保存到 mysql 服务器? +1 for .transactionally - 仅此一项就提高了插入 50,000 行的性能,从 3 分钟缩短到 1 分钟。 自 slick-pg 0.16.1 以来,有一个 .insertOrUpdateAll 用于使用 Slick 和 postgresql 进行批量更新【参考方案2】:

正如您在Slick examples 看到的,您可以使用++= 函数通过JDBC 批量插入功能进行插入。每个实例:

val foos = TableQuery[FooTable]
val rows: Seq[Foo] = ...
foos ++= rows // here slick will use batch insert

您还可以通过“分组”行序列来“调整”批量:

val batchSize = 1000
rows.grouped(batchSize).foreach  group => foos ++= group 

【讨论】:

谢谢,但我不认为 ++= 会进行 insertOrUpdate。我相信它只是插入,在我的情况下,如果有重复的行会抛出完整性异常【参考方案3】:

使用sql

这个演示作品

case ("insertOnDuplicateKey",answers:List[Answer])=>
  def buildInsert(r: Answer): DBIO[Int] =
    sqlu"insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values ($r.aid,$r.bid,$r.sbid,$r.qid,$r.ups,$r.author,$r.uid,$r.nick,$r.pub_time,$r.content,$r.good,$r.hot,$r.id,$r.reply,$r.pic,$r.spider_time) ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"
  val inserts: Seq[DBIO[Int]] = answers.map(buildInsert)
  val combined: DBIO[Seq[Int]] = DBIO.sequence(inserts)
  DEST_DB.run(combined).onComplete(data=>
    println("insertOnDuplicateKey data result",data.get.mkString)
    if (data.isSuccess)
      println(data.get)
      val lastid=answers.last.id
      Sync.lastActor !("upsert",tablename,lastid)
    else
      //retry
      self !("insertOnDuplicateKey",answers)
    
  )

我尝试在单个 sql 中使用 sqlu,但错误可能是 sqlu 不提供字符串插值

这个演示不起作用

case ("insertOnDuplicateKeyError",answers:List[Answer])=>
  def buildSql(execpre:String,values: String,execafter:String): DBIO[Int] = sqlu"$execpre $values $execafter"
  val execpre="insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time)  values "
  val execafter=" ON DUPLICATE KEY UPDATE  `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"
  val valuesstr=answers.map(row=>("("+List(row.aid,row.bid,row.sbid,row.qid,row.ups,"'"+row.author+"'","'"+row.uid+"'","'"+row.nick+"'","'"+row.pub_time+"'","'"+row.content+"'",row.good,row.hot,row.id,row.reply,row.pic,"'"+row.spider_time+"'").mkString(",")+")")).mkString(",\n")
  val insertOrUpdateAction=DBIO.seq(
    buildSql(execpre,valuesstr,execafter)
  )
  DEST_DB.run(insertOrUpdateAction).onComplete(data=>
    if (data.isSuccess)
      println("insertOnDuplicateKey data result",data)
      //retry
      val lastid=answers.last.id
      Sync.lastActor !("upsert",tablename,lastid)
    else
      self !("insertOnDuplicateKey2",answers)
    
  )

一个带有 scala slick 的 mysql 同步工具 https://github.com/cclient/ScalaMysqlSync

【讨论】:

以上是关于Slick 3.0 批量插入或更新(更新插入)的主要内容,如果未能解决你的问题,请参考以下文章

MySQL+MyBatis一条命令批量插入或更新

MySQL+MyBatis一条命令批量插入或更新

HIbernate 批量插入或更新在 Spring Boot 中不起作用

使用 Hibernate 批量插入或更新?

批量插入或更新数据

使用 Petapoco 批量插入/更新