小项目SQL server数据实时同步到mysql

Posted 一曲长歌一剑天涯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小项目SQL server数据实时同步到mysql相关的知识,希望对你有一定的参考价值。

golang代码实现

项目代码:https://github.com/wzz1234wzz...

1. 目录结构

sqlserver
├── config  //配置文件
│   ├── config.go
│   ├── config_test.go
│   └── dev_custom.yml
├── dao // mysql数据库操作
│   ├── mysql.go // 初始化mysql
│   ├── record.go // 记录表操作
│   ├── record_test.go
│   ├── users.go 
│   └── users_test.go
├── go.mod
├── go.sum
├── logic // 业务逻辑
│   ├── logic.go
│   └── logic_test.go
├── main.go // 入口函数
├── models  // 数据库字段模型
│   ├── record.go
│   ├── reverse // xorm数据库自动生成表结构工具
│   │   ├── README.txt
│   │   └── gennerate.sh // 可执行文件
│   └── users.go
├── sqldao // sql server数据库操作
│   ├── sql.go // 初始化sql
│   ├── users.go // users表操作
│   └── users_test.go
└── sqlserver.exe

2.各个文件内容

2.1 dev_custom.yml

mySql:
  dns: "user:passwd@tcp(127.0.0.1:3306)/wzz_db?charset=utf8"

sqlServer:
  dns: "sqlserver://sa:123456@localhost:1433?database=wzz"

2.2 config.go

package config

import (
    "fmt"
    "github.com/mitchellh/mapstructure"
    "github.com/spf13/viper"
    "runtime"
    "strings"
)

// CustomT CustomT
type Mysql struct {
    DNS string `yaml:"dns"`
}

type SqlServer struct {
    DNS string `yaml:"dns"`
}

// CustomT CustomT
type CustomT struct {
    Mysql     Mysql     `yaml:"mySql"`
    SqlServer SqlServer `yaml:"sqlServer"`
}

// Custom Custom
var Custom CustomT

// ReadConfig ReadConfig for custom
func ReadConfig(configName string, configPath string, configType string) *viper.Viper {
    v := viper.New()
    v.SetConfigName(configName)
    v.AddConfigPath(configPath)
    v.SetConfigType(configType)
    err := v.ReadInConfig()
    if err != nil {
        return nil
    }
    return v
}

func CurrentFileDir() string {
    _, file, _, ok := runtime.Caller(1)
    if !ok {
        return "失败"
    }
    i := strings.LastIndex(file, "/")
    if i < 0 {
        i = strings.LastIndex(file, "\\\\")
    }

    return string(file[0 : i+1])
}

// InitConfig InitConfig
func InitConfig() {
    path := CurrentFileDir()
    v := ReadConfig("dev_custom", path, "yml")
    md := mapstructure.Metadata{}
    err := v.Unmarshal(&Custom, func(config *mapstructure.DecoderConfig) {
        config.TagName = "yaml"
        config.Metadata = &md
    })
    if err != nil {
        panic(err)
        return
    }
    fmt.Println("InitConfig Success!")
}

2.3 config_test.go

package config

import (
    "testing"
)

func TestUsers(t *testing.T) {
    t.Log(CurrentFileDir())
    InitConfig()
    t.Log("mysqlDNS=", Custom.Mysql.DNS)
    t.Log("sqlserverDNS=", Custom.SqlServer.DNS)
}

2.4 dao/mysql.go

package dao

import (
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    "sqlserver/config"
    "xorm.io/xorm"
)

var Orm *xorm.Engine

var (
    Users  = &UsersDao{}
    Record = &RecordDao{}
)

func InitMysql() {
    orm, err := xorm.NewEngine("mysql", config.Custom.Mysql.DNS)
    if err != nil {
        fmt.Println("init mysql fail. err = ", err)
        return
    }
    Orm = orm
    fmt.Println("InitMysql success!")
}

2.5 dao/record.go

package dao

import (
    "context"
    "sqlserver/models"
    "time"
    "xorm.io/xorm"
)

type RecordDao struct{}

func (dao *RecordDao) AddTable(ctx context.Context, session *xorm.Session, tableName string, lastId int64) error {
    record := &models.Record{}
    record.TableName = tableName
    record.LastId = lastId
    record.CreatedAt = time.Now()
    record.UpdatedAt = time.Now()
    _, err := session.InsertOne(record)
    return err
}

func (dao *RecordDao) QueryLastIdByTableName(ctx context.Context, session *xorm.Session, tableName string) *models.Record {
    record := &models.Record{}
    record.TableName = tableName
    has, err := session.Get(record)
    if err != nil || !has {
        return nil
    }
    return record
}

func (dao *RecordDao) UpdateLastId(ctx context.Context, session *xorm.Session, record *models.Record) error {
    session.Where("table_name = ?", record.TableName)
    _, err := session.Cols("last_id", "updated_at").Update(record)
    if err != nil {
        return err
    }
    return nil
}

2.6 dao/record_test.go

package dao

import (
    "context"
    _ "github.com/go-sql-driver/mysql"
    "sqlserver/models"
    "testing"
    "time"
    "xorm.io/xorm"
)

var mysqlDNS = "root:123456@tcp(10.0.0.0:3306)/wzz_db?charset=utf8"

func TestAddRecord(t *testing.T) {
    ctx := context.Background()
    orm, err := xorm.NewEngine("mysql", mysqlDNS)
    if err != nil {
        t.Fatal("open mysql fail. err = ", err)
    }
    session := orm.Context(ctx)
    var dao RecordDao
    err = dao.AddTable(ctx, session, "job", 0)
    if err != nil {
        t.Fatal("AddTable fail. err = ", err)
    }
}

func TestQueryLastIdByTableName(t *testing.T) {
    ctx := context.Background()
    orm, err := xorm.NewEngine("mysql", mysqlDNS)
    if err != nil {
        t.Fatal("open mysql fail. err = ", err)
    }
    session := orm.Context(ctx)
    var dao RecordDao
    record := dao.QueryLastIdByTableName(ctx, session, "users")
    if record == nil {
        t.Fatal("QueryLastIdByTableName fail. err = ", err)
    }
    t.Log("lastId=", record.LastId)
}

func TestUpdateLastId(t *testing.T) {
    ctx := context.Background()
    orm, err := xorm.NewEngine("mysql", mysqlDNS)
    if err != nil {
        t.Fatal("open mysql fail. err = ", err)
    }
    session := orm.Context(ctx)
    var dao RecordDao
    record := &models.Record{1, "users", 5, time.Now(), time.Now()}
    err = dao.UpdateLastId(ctx, session, record)
    if err != nil {
        t.Fatal("UpdateLastId fail. err = ", err)
    }
    record = dao.QueryLastIdByTableName(ctx, session, "users")
    if record == nil {
        t.Fatal("QueryLastIdByTableName fail. err = ", err)
    }
    t.Log("lastId=", record.LastId)
}

2.7 dao/users.go

package dao

import (
    "context"
    "errors"
    "sqlserver/models"
    "xorm.io/xorm"
)

type UsersDao struct{}

func (dao *UsersDao) BatchAddUser(ctx context.Context, session *xorm.Session, users []*models.Users) error {
    if users == nil {
        return errors.New("Jobs object are nil.\\n")
    }
    _, err := session.Insert(users)
    return err
}

func (dao *UsersDao) QueryByUserID(ctx context.Context, session *xorm.Session, userID int64) *models.Users {
    user := &models.Users{}
    user.Userid = userID
    has, err := session.Get(user)
    if err != nil || !has {
        return nil
    }
    return user
}

2.8 dao/users_test.go

package dao

import (
    "context"
    _ "github.com/go-sql-driver/mysql"
    "sqlserver/models"
    "testing"
    "xorm.io/xorm"
)

func TestBatchAddUsers(t *testing.T) {
    var users []*models.Users
    users = append(users, &models.Users{6, "wzz", 10})
    users = append(users, &models.Users{7, "gh", 20})

    ctx := context.Background()
    orm, err := xorm.NewEngine("mysql", mysqlDNS)
    if err != nil {
        t.Fatal("open mysql fail. err = ", err)
    }
    session := orm.Context(ctx)
    var dao UsersDao
    err = dao.BatchAddUser(ctx, session, users)
    if err != nil {
        t.Fatal("AddTable fail. err = ", err)
    }
    user := dao.QueryByUserID(ctx, session, 7)
    t.Log("user=", user)
}

2.9 logic/logic.go

package logic

import (
    "context"
    "fmt"
    cron "github.com/robfig/cron/v3"
    "sqlserver/dao"
    "sqlserver/sqldao"
)

const (
    cronSpecCheck = "*/10 * * * * *" //Every 30s
)

func Do() *cron.Cron {
    cron := cron.New(cron.WithSeconds())
    cron.AddFunc(cronSpecCheck, Supervisor)
    cron.Start()
    return cron
}

func Supervisor() {
    ctx := context.Background()
    session := dao.Orm.Context(ctx)
    record := dao.Record.QueryLastIdByTableName(ctx, session, "users")
    if record == nil {
        fmt.Println("查询失败!")
        return
    }
    fmt.Println("lastid=", record.LastId)
    users := sqldao.SqlUser.QueryByUserIDFromSqlserver(ctx, sqldao.Sql, record.LastId)
    if len(users) == 0 {
        fmt.Println("无更新...")
        return
    }
    fmt.Println("users=", users)
    for _, use := range users {
        fmt.Println(*use)
    }
    record.LastId = users[len(users)-1].Userid
    dao.Record.UpdateLastId(ctx, session, record)
    err := dao.Users.BatchAddUser(ctx, session, users)
    if err != nil {
        fmt.Println("添加失败!")
        return
    }
}

2.10 logic/logic_test.go

package logic

import (
    "github.com/robfig/cron/v3"
    "sqlserver/config"
    "sqlserver/dao"
    "sqlserver/sqldao"
    "testing"
    "time"
)

func TestLogic(t *testing.T) {
    config.InitConfig()
    dao.InitMysql()
    sqldao.InitSqlserver()
    Do()
}

func TestCron(t *testing.T) {
    c := cron.New()
    i := 1
    c.AddFunc("*/1 * * * *", func() {
        t.Log("每分钟执行一次", i)
        i++
    })
    c.Start()
    time.Sleep(time.Minute * 5)
}

2.11 models/record.go

package models

import (
    "time"
)

type Record struct {
    Id        int       `xorm:"not null pk autoincr INT(10)"`
    TableName string    `xorm:"not null VARCHAR(100)"`
    LastId    int64     `xorm:"not null BIGINT(20)"`
    CreatedAt time.Time `xorm:"default CURRENT_TIMESTAMP TIMESTAMP"`
    UpdatedAt time.Time `xorm:"default CURRENT_TIMESTAMP TIMESTAMP"`
}

2.12 modles/users.go

package models

type Users struct {
    Userid int64  `xorm:"INT(11)"`
    Name   string `xorm:"CHAR(10)"`
    Age    int    `xorm:"INT(11)"`
}

2.13 modles/reverse/custom.yml

kind: reverse
name: testdb
source:
  database: mysql
  conn_str: \'root:123456@tcp(10.0.0.0:3306)/wzz_db?parseTime=true\'
targets:
- type: codes
  multiple_files: true
  language: golang
  output_dir: ../

2.14 modles/reverse/generate.sh

reverse -f custom.yml

2.15 sqldao/sql.go

package sqldao

import (
    "fmt"
    "gorm.io/driver/sqlserver"
    "gorm.io/gorm"
    "sqlserver/config"
)

var Sql *gorm.DB
var SqlUser = &SqlUsersDao{}

func InitSqlserver() {
    gSession, err := gorm.Open(sqlserver.Open(config.Custom.SqlServer.DNS), &gorm.Config{})
    if err != nil {
        panic(err)
    }
    Sql = gSession
    fmt.Println("InitSqlserver success!")
}

2.16 sqldao/users.go

package sqldao

import (
    "context"
    "gorm.io/gorm"
    "sqlserver/models"
)

type SqlUsersDao struct{}

func (dao *SqlUsersDao) QueryByUserIDFromSqlserver(ctx context.Context, gSession *gorm.DB, lastId int64) []*models.Users {
    var users = make([]*models.Users, 0)
    gSession.Table("dbo.users").Where("userid> ?", lastId).Find(&users)
    return users
}

func (dao *SqlUsersDao) QueryLastFromSqlserver(ctx context.Context, gSession *gorm.DB) models.Users {
    var user models.Users
    gSession.Table("dbo.users").Last(&user)
    return user
}

2.17 sqldao/users_test.go

package sqldao

import (
    "context"
    "fmt"
    "gorm.io/driver/sqlserver"
    "gorm.io/gorm"
    "testing"
)

var SQLDSN = "sqlserver://sa:123456@localhost:1433?database=wzz"

func TestUsers(t *testing.T) {
    gSession, err := gorm.Open(sqlserver.Open(SQLDSN), &gorm.Config{})
    if err != nil {
        panic(err)
    }
    ctx := context.Background()
    var dao SqlUsersDao
    usr := dao.QueryLastFromSqlserver(ctx, gSession)
    fmt.Println("last id=", usr.Userid)
    users := dao.QueryByUserIDFromSqlserver(ctx, gSession, 2)
    for _, u := range users {
        fmt.Println(u)
    }
}

2.18 go.mod

module sqlserver

go 1.15

require (
    github.com/alexbrainman/odbc v0.0.0-20200426075526-f0492dfa1575
    github.com/denisenkom/go-mssqldb v0.10.0
    github.com/go-ole/go-ole v1.2.5 // indirect
    github.com/go-sql-driver/mysql v1.5.0
    github.com/jinzhu/gorm v1.9.16
    github.com/mattn/go-adodb v0.0.1
    github.com/mitchellh/mapstructure v1.4.1
    github.com/robfig/cron/v3 v3.0.1
    github.com/spf13/viper v1.7.1
    golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
    gorm.io/driver/sqlserver v1.0.7
    gorm.io/gorm v1.21.10
    xorm.io/xorm v1.1.0
)

2.19 main.go

package main

import (
    "sqlserver/config"
    "sqlserver/dao"
    "sqlserver/logic"
    "sqlserver/sqldao"
)

func init() {
    config.InitConfig()
    dao.InitMysql()
    sqldao.InitSqlserver()
}

func main() {
    cron := logic.Do()
    defer cron.Stop()

    // 死循环
    ch := make(chan int)
    <-ch
}

以上是关于小项目SQL server数据实时同步到mysql的主要内容,如果未能解决你的问题,请参考以下文章

MySQL 到 SQL Server 实时数据同步实操分享

MySQL 到 SQL Server 实时数据同步实操分享

如何将数据从 SQL Server 实时或定时同步到 MySQL 数据库

Oracle 数据怎么实时同步到 SQL Server | 亲测干货分享建议收藏

如何把sql server一张表的数据实时同步到Oracle数据库

mysql和sql server可以同时安装吗