数据库批量执行实践(go语言)
Posted onecing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据库批量执行实践(go语言)相关的知识,希望对你有一定的参考价值。
数据库的高效执行(查询、写入),除了,设置索引、数据库配置调优、表结构在业务上设置合理、分库分表?、读写分离等等之外,要想改善数据的写入速度,一大杀器是批量插入,此批量并不是大事务,而是利用数据库的特性,insert xxx values (?,?,?),(?,?,?) 的特性,实测效率比单个单个的插入 快10倍以上。
那么批量执行,效率如此之高,能够写出一个自动的批量执行的控件就显得尤为必要
1:能够到一定数据,开始批量插入
2:没到一定的数目,但是时间点到了,也要批量插入。
满足这两个条件,同时避免代码随处乱用,就有了如下的代码方案:
我看到go-zero中也有这样的实现控件,但是看了看,觉得与整体耦合性太大,不易单独拿出来,封装性有点过度,封装的使用,锁的使用,还是有点云里雾里,不太直观。
倒不如,理解其意,自己写一个来的方便。
一:go-zero bulkinsert的简单逻辑
增加任务时,做判断,
加入 Task,
开启定时器,分别等待任务执行通知和定时器到期
if >= macCount
任务(args) -> commander
等待任务执行完
二:我的方案
1:增加了 一个缓冲区,保证执行时,能够继续入缓冲,不必等待语句执行,
2:可以自己控制缓冲区的大小,可以控制批量执行的数目(比如sql过长,执行不成功,控制数目可以避免这种情况。)
batchinsert.go
package main
import (
"fmt"
"runtime/debug"
"strings"
"sync"
"time"
)
// DataChanSize benchmark 数据chan加大,把所有数据都第一时间接收下来,可以灵活调整
const DataChanSize = 1000000 // benchmark
type BatchDataType struct
sync.Mutex
data []interface // params to batchFun
dataChan chan interface // buf
timerCh chan struct // notify
batchCh chan struct // notify
batchFun func([]interface) error
batchIntervel time.Duration
batchCount int
func NewBatchOperator(count int, interval time.Duration, customBatchFun func([]interface) error) *BatchDataType
if count < 1
panic("batch count not < 1")
if nil == customBatchFun
panic("batch fun must not nil")
m := &BatchDataType
data: make([]interface, 0, count),
// dataChan的作用,是在批量执行data数据时,其他地方还可以继续放入dataChan,不然都得等待数据库批量执行
dataChan: make(chan interface, DataChanSize), // benchmark
timerCh: make(chan struct),
batchCh: make(chan struct),
batchFun: customBatchFun,
batchIntervel: interval,
batchCount: count,
go m.prepareBatchData()
return m
func (m *BatchDataType) prepareBatchData()
for
v := <-m.dataChan
if m.batchCount == len(m.data)
close(m.timerCh)
// 等待批量执行完
//Log.Info("-----begin batch", time.Now())
<-m.batchCh
//Log.Info("-----end batch", time.Now())
m.Lock()
m.data = append(m.data, v)
if 1 == len(m.data)
m.timerCh = make(chan struct)
m.batchCh = make(chan struct)
go m.taskTimer(time.NewTicker(m.batchIntervel))
m.Unlock()
func (m *BatchDataType) PutData(v interface)
m.dataChan <- v
func (m *BatchDataType) batchInsertData()
defer func()
if err := recover(); err != nil
Log.Error("panic error: ", string(debug.Stack()))
()
m.Lock()
defer m.Unlock()
// 通知执行完了
defer close(m.batchCh)
//Log.Infof("----begin insert----len:[%d]\\n", len(m.data))
// 执行外部的批处理函数
m.batchFun(m.data)
// 清空数据
m.data = m.data[:0]
//Log.Infof("-----end insert----len:[%d]\\n", len(m.data))
func (m *BatchDataType) taskTimer(ticker *time.Ticker)
defer ticker.Stop()
select
case <-ticker.C:
//Log.Info("-----timer----")
m.batchInsertData()
case <-m.timerCh:
//Log.Info("----ch-----")
m.batchInsertData()
func insertDataBatch(stmtBase string, valueStrings []string, valueArgs []interface, fieldNum int) (err error)
lenAllRows := len(valueStrings)
for start := 0; start < lenAllRows; start += _batchNum
end := start + _batchNum
if end > lenAllRows
end = lenAllRows
batchValueStrings := valueStrings[start:end]
batchValueArgs := valueArgs[start*fieldNum : end*fieldNum]
stmt := fmt.Sprintf(stmtBase, strings.Join(batchValueStrings, ","))
_, err = _db.Exec(stmt, batchValueArgs...)
if nil != err
Log.Errorf("insertDataBatch err[%v], stmtBase[%s]", err, stmtBase)
return err
return
其他地方的使用:
// 1:定义批处理个数
var _batchNum = 1000
// 2:初始化时定义 执行对象
var _myDataBatcher = NewBatchOperator(_batchNum, 2*time.Second, batchInsertMyData)
// 3:定义 消息结构体
type MyDataBatchInfo struct
idStr string
datetime string
// 或者其他数据
protoData []*ProtoData.MyDataInfo
// 4:定义 自定义处理函数
func batchInsertMyData(arrMyData []interface) error
sParam := "(" + strings.Repeat(",?", 26)[1:] + ")"
valueStrings := make([]string, 0, len(arrMyData))
fieldNum := strings.Count(sParam, "?")
valueArgs := make([]interface, 0, len(arrMyData)*fieldNum)
for _, v0 := range arrMyData
v1, ok := v0.(*MyDataBatchInfo)
if !ok
Log.Error("error:not *MyDataBatchInfo")
return errors.New("batch myData not needed type")
for _, v2 := range v1.protoData
valueStrings = append(valueStrings, sParam)
valueArgs = append(valueArgs, v1.idStr)
valueArgs = append(valueArgs, v1.datetime)
valueArgs = append(valueArgs, v2.GetOsInfo())
// 其他参数的设置。。。
if len(valueStrings) > 0
stmtBase := "INSERT INTO myData_baseinfo (mac,infotime,os,/*等等其他参数*/) VALUES %s ON DUPLICATE KEY UPDATE infotime=values(infotime),os=values(os),/*等等其他更新参数*/ "
err := insertDataBatch(stmtBase, valueStrings, valueArgs, fieldNum)
if err != nil
return err
return nil
至此,批处理 被完美的解决。还有了异步缓存的功能。
以上是关于数据库批量执行实践(go语言)的主要内容,如果未能解决你的问题,请参考以下文章