小项目SQL server数据实时同步到mysql
Posted 一曲长歌一剑天涯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小项目SQL server数据实时同步到mysql相关的知识,希望对你有一定的参考价值。
golang代码实现
项目代码:https://github.com/wzz1234wzz...
1. 目录结构
sqlserver
├── config //配置文件
│ ├── config.go
│ ├── config_test.go
│ └── dev_custom.yml
├── dao // mysql数据库操作
│ ├── mysql.go // 初始化mysql
│ ├── record.go // 记录表操作
│ ├── record_test.go
│ ├── users.go
│ └── users_test.go
├── go.mod
├── go.sum
├── logic // 业务逻辑
│ ├── logic.go
│ └── logic_test.go
├── main.go // 入口函数
├── models // 数据库字段模型
│ ├── record.go
│ ├── reverse // xorm数据库自动生成表结构工具
│ │ ├── README.txt
│ │ └── gennerate.sh // 可执行文件
│ └── users.go
├── sqldao // sql server数据库操作
│ ├── sql.go // 初始化sql
│ ├── users.go // users表操作
│ └── users_test.go
└── sqlserver.exe
2.各个文件内容
2.1 dev_custom.yml
mySql:
dns: "user:passwd@tcp(127.0.0.1:3306)/wzz_db?charset=utf8"
sqlServer:
dns: "sqlserver://sa:123456@localhost:1433?database=wzz"
2.2 config.go
package config
import (
"fmt"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"runtime"
"strings"
)
// CustomT CustomT
type Mysql struct {
DNS string `yaml:"dns"`
}
type SqlServer struct {
DNS string `yaml:"dns"`
}
// CustomT CustomT
type CustomT struct {
Mysql Mysql `yaml:"mySql"`
SqlServer SqlServer `yaml:"sqlServer"`
}
// Custom Custom
var Custom CustomT
// ReadConfig ReadConfig for custom
func ReadConfig(configName string, configPath string, configType string) *viper.Viper {
v := viper.New()
v.SetConfigName(configName)
v.AddConfigPath(configPath)
v.SetConfigType(configType)
err := v.ReadInConfig()
if err != nil {
return nil
}
return v
}
func CurrentFileDir() string {
_, file, _, ok := runtime.Caller(1)
if !ok {
return "失败"
}
i := strings.LastIndex(file, "/")
if i < 0 {
i = strings.LastIndex(file, "\\\\")
}
return string(file[0 : i+1])
}
// InitConfig InitConfig
func InitConfig() {
path := CurrentFileDir()
v := ReadConfig("dev_custom", path, "yml")
md := mapstructure.Metadata{}
err := v.Unmarshal(&Custom, func(config *mapstructure.DecoderConfig) {
config.TagName = "yaml"
config.Metadata = &md
})
if err != nil {
panic(err)
return
}
fmt.Println("InitConfig Success!")
}
2.3 config_test.go
package config
import (
"testing"
)
func TestUsers(t *testing.T) {
t.Log(CurrentFileDir())
InitConfig()
t.Log("mysqlDNS=", Custom.Mysql.DNS)
t.Log("sqlserverDNS=", Custom.SqlServer.DNS)
}
2.4 dao/mysql.go
package dao
import (
"fmt"
_ "github.com/go-sql-driver/mysql"
"sqlserver/config"
"xorm.io/xorm"
)
var Orm *xorm.Engine
var (
Users = &UsersDao{}
Record = &RecordDao{}
)
func InitMysql() {
orm, err := xorm.NewEngine("mysql", config.Custom.Mysql.DNS)
if err != nil {
fmt.Println("init mysql fail. err = ", err)
return
}
Orm = orm
fmt.Println("InitMysql success!")
}
2.5 dao/record.go
package dao
import (
"context"
"sqlserver/models"
"time"
"xorm.io/xorm"
)
type RecordDao struct{}
func (dao *RecordDao) AddTable(ctx context.Context, session *xorm.Session, tableName string, lastId int64) error {
record := &models.Record{}
record.TableName = tableName
record.LastId = lastId
record.CreatedAt = time.Now()
record.UpdatedAt = time.Now()
_, err := session.InsertOne(record)
return err
}
func (dao *RecordDao) QueryLastIdByTableName(ctx context.Context, session *xorm.Session, tableName string) *models.Record {
record := &models.Record{}
record.TableName = tableName
has, err := session.Get(record)
if err != nil || !has {
return nil
}
return record
}
func (dao *RecordDao) UpdateLastId(ctx context.Context, session *xorm.Session, record *models.Record) error {
session.Where("table_name = ?", record.TableName)
_, err := session.Cols("last_id", "updated_at").Update(record)
if err != nil {
return err
}
return nil
}
2.6 dao/record_test.go
package dao
import (
"context"
_ "github.com/go-sql-driver/mysql"
"sqlserver/models"
"testing"
"time"
"xorm.io/xorm"
)
var mysqlDNS = "root:123456@tcp(10.0.0.0:3306)/wzz_db?charset=utf8"
func TestAddRecord(t *testing.T) {
ctx := context.Background()
orm, err := xorm.NewEngine("mysql", mysqlDNS)
if err != nil {
t.Fatal("open mysql fail. err = ", err)
}
session := orm.Context(ctx)
var dao RecordDao
err = dao.AddTable(ctx, session, "job", 0)
if err != nil {
t.Fatal("AddTable fail. err = ", err)
}
}
func TestQueryLastIdByTableName(t *testing.T) {
ctx := context.Background()
orm, err := xorm.NewEngine("mysql", mysqlDNS)
if err != nil {
t.Fatal("open mysql fail. err = ", err)
}
session := orm.Context(ctx)
var dao RecordDao
record := dao.QueryLastIdByTableName(ctx, session, "users")
if record == nil {
t.Fatal("QueryLastIdByTableName fail. err = ", err)
}
t.Log("lastId=", record.LastId)
}
func TestUpdateLastId(t *testing.T) {
ctx := context.Background()
orm, err := xorm.NewEngine("mysql", mysqlDNS)
if err != nil {
t.Fatal("open mysql fail. err = ", err)
}
session := orm.Context(ctx)
var dao RecordDao
record := &models.Record{1, "users", 5, time.Now(), time.Now()}
err = dao.UpdateLastId(ctx, session, record)
if err != nil {
t.Fatal("UpdateLastId fail. err = ", err)
}
record = dao.QueryLastIdByTableName(ctx, session, "users")
if record == nil {
t.Fatal("QueryLastIdByTableName fail. err = ", err)
}
t.Log("lastId=", record.LastId)
}
2.7 dao/users.go
package dao
import (
"context"
"errors"
"sqlserver/models"
"xorm.io/xorm"
)
type UsersDao struct{}
func (dao *UsersDao) BatchAddUser(ctx context.Context, session *xorm.Session, users []*models.Users) error {
if users == nil {
return errors.New("Jobs object are nil.\\n")
}
_, err := session.Insert(users)
return err
}
func (dao *UsersDao) QueryByUserID(ctx context.Context, session *xorm.Session, userID int64) *models.Users {
user := &models.Users{}
user.Userid = userID
has, err := session.Get(user)
if err != nil || !has {
return nil
}
return user
}
2.8 dao/users_test.go
package dao
import (
"context"
_ "github.com/go-sql-driver/mysql"
"sqlserver/models"
"testing"
"xorm.io/xorm"
)
func TestBatchAddUsers(t *testing.T) {
var users []*models.Users
users = append(users, &models.Users{6, "wzz", 10})
users = append(users, &models.Users{7, "gh", 20})
ctx := context.Background()
orm, err := xorm.NewEngine("mysql", mysqlDNS)
if err != nil {
t.Fatal("open mysql fail. err = ", err)
}
session := orm.Context(ctx)
var dao UsersDao
err = dao.BatchAddUser(ctx, session, users)
if err != nil {
t.Fatal("AddTable fail. err = ", err)
}
user := dao.QueryByUserID(ctx, session, 7)
t.Log("user=", user)
}
2.9 logic/logic.go
package logic
import (
"context"
"fmt"
cron "github.com/robfig/cron/v3"
"sqlserver/dao"
"sqlserver/sqldao"
)
const (
cronSpecCheck = "*/10 * * * * *" //Every 30s
)
func Do() *cron.Cron {
cron := cron.New(cron.WithSeconds())
cron.AddFunc(cronSpecCheck, Supervisor)
cron.Start()
return cron
}
func Supervisor() {
ctx := context.Background()
session := dao.Orm.Context(ctx)
record := dao.Record.QueryLastIdByTableName(ctx, session, "users")
if record == nil {
fmt.Println("查询失败!")
return
}
fmt.Println("lastid=", record.LastId)
users := sqldao.SqlUser.QueryByUserIDFromSqlserver(ctx, sqldao.Sql, record.LastId)
if len(users) == 0 {
fmt.Println("无更新...")
return
}
fmt.Println("users=", users)
for _, use := range users {
fmt.Println(*use)
}
record.LastId = users[len(users)-1].Userid
dao.Record.UpdateLastId(ctx, session, record)
err := dao.Users.BatchAddUser(ctx, session, users)
if err != nil {
fmt.Println("添加失败!")
return
}
}
2.10 logic/logic_test.go
package logic
import (
"github.com/robfig/cron/v3"
"sqlserver/config"
"sqlserver/dao"
"sqlserver/sqldao"
"testing"
"time"
)
func TestLogic(t *testing.T) {
config.InitConfig()
dao.InitMysql()
sqldao.InitSqlserver()
Do()
}
func TestCron(t *testing.T) {
c := cron.New()
i := 1
c.AddFunc("*/1 * * * *", func() {
t.Log("每分钟执行一次", i)
i++
})
c.Start()
time.Sleep(time.Minute * 5)
}
2.11 models/record.go
package models
import (
"time"
)
type Record struct {
Id int `xorm:"not null pk autoincr INT(10)"`
TableName string `xorm:"not null VARCHAR(100)"`
LastId int64 `xorm:"not null BIGINT(20)"`
CreatedAt time.Time `xorm:"default CURRENT_TIMESTAMP TIMESTAMP"`
UpdatedAt time.Time `xorm:"default CURRENT_TIMESTAMP TIMESTAMP"`
}
2.12 modles/users.go
package models
type Users struct {
Userid int64 `xorm:"INT(11)"`
Name string `xorm:"CHAR(10)"`
Age int `xorm:"INT(11)"`
}
2.13 modles/reverse/custom.yml
kind: reverse
name: testdb
source:
database: mysql
conn_str: \'root:123456@tcp(10.0.0.0:3306)/wzz_db?parseTime=true\'
targets:
- type: codes
multiple_files: true
language: golang
output_dir: ../
2.14 modles/reverse/generate.sh
reverse -f custom.yml
2.15 sqldao/sql.go
package sqldao
import (
"fmt"
"gorm.io/driver/sqlserver"
"gorm.io/gorm"
"sqlserver/config"
)
var Sql *gorm.DB
var SqlUser = &SqlUsersDao{}
func InitSqlserver() {
gSession, err := gorm.Open(sqlserver.Open(config.Custom.SqlServer.DNS), &gorm.Config{})
if err != nil {
panic(err)
}
Sql = gSession
fmt.Println("InitSqlserver success!")
}
2.16 sqldao/users.go
package sqldao
import (
"context"
"gorm.io/gorm"
"sqlserver/models"
)
type SqlUsersDao struct{}
func (dao *SqlUsersDao) QueryByUserIDFromSqlserver(ctx context.Context, gSession *gorm.DB, lastId int64) []*models.Users {
var users = make([]*models.Users, 0)
gSession.Table("dbo.users").Where("userid> ?", lastId).Find(&users)
return users
}
func (dao *SqlUsersDao) QueryLastFromSqlserver(ctx context.Context, gSession *gorm.DB) models.Users {
var user models.Users
gSession.Table("dbo.users").Last(&user)
return user
}
2.17 sqldao/users_test.go
package sqldao
import (
"context"
"fmt"
"gorm.io/driver/sqlserver"
"gorm.io/gorm"
"testing"
)
var SQLDSN = "sqlserver://sa:123456@localhost:1433?database=wzz"
func TestUsers(t *testing.T) {
gSession, err := gorm.Open(sqlserver.Open(SQLDSN), &gorm.Config{})
if err != nil {
panic(err)
}
ctx := context.Background()
var dao SqlUsersDao
usr := dao.QueryLastFromSqlserver(ctx, gSession)
fmt.Println("last id=", usr.Userid)
users := dao.QueryByUserIDFromSqlserver(ctx, gSession, 2)
for _, u := range users {
fmt.Println(u)
}
}
2.18 go.mod
module sqlserver
go 1.15
require (
github.com/alexbrainman/odbc v0.0.0-20200426075526-f0492dfa1575
github.com/denisenkom/go-mssqldb v0.10.0
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/jinzhu/gorm v1.9.16
github.com/mattn/go-adodb v0.0.1
github.com/mitchellh/mapstructure v1.4.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/viper v1.7.1
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
gorm.io/driver/sqlserver v1.0.7
gorm.io/gorm v1.21.10
xorm.io/xorm v1.1.0
)
2.19 main.go
package main
import (
"sqlserver/config"
"sqlserver/dao"
"sqlserver/logic"
"sqlserver/sqldao"
)
func init() {
config.InitConfig()
dao.InitMysql()
sqldao.InitSqlserver()
}
func main() {
cron := logic.Do()
defer cron.Stop()
// 死循环
ch := make(chan int)
<-ch
}
以上是关于小项目SQL server数据实时同步到mysql的主要内容,如果未能解决你的问题,请参考以下文章
如何将数据从 SQL Server 实时或定时同步到 MySQL 数据库
Oracle 数据怎么实时同步到 SQL Server | 亲测干货分享建议收藏