转移,清洗,同步数据

Posted 一明

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了转移,清洗,同步数据相关的知识,希望对你有一定的参考价值。

最近看了看公司的导入,清洗,同步数据。想自己也实现下

首先用SqlBulkCopy批量导入,然后用Partition by对要删除的数据进行分组,然后删除ID>1的数据。同步数据就是对源数据进行查询,然后批量更新目标数据。

我用MVC实现了下,代码实现如下:

前台代码

@{
    Layout = null;
}
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <title>数据同步</title>
</head>
<body>
    <input type="button" id="btnCopy" value="复制数据"/>
    <input type="button" id="btnSync" value="同步已有变化的数据"/>
    <input type="button" id="btnClear" value="清洗数据"/>
</body>
</html>
<script src="~/Scripts/jquery-1.10.2.min.js"></script>
<script>
    $("#btnCopy").click(function () {
        $.ajax({
            type: "post",                     
            url: "/Home/CopyData",
            beforeSend: function () {
                // 禁用按钮防止重复提交
                $("#btnCopy").attr({ disabled: "disabled" });
            },
            success: function (data) {               
                if (data == "ok") {
                    alert("恭喜你,复制成功~");
                } else {
                    alert("复制失败!");
                }
            },
            complete: function () {
                $("#btnCopy").removeAttr("disabled");
            },
            error: function (data) {
                console.info("error: " + data.responseText);
            }
        });
    });
    $("#btnSync").click(function () {
        $.ajax({
            type: "post",
            url: "/Home/SyncClick3",
            beforeSend: function () {
                // 禁用按钮防止重复提交
                $("#btnSync").attr({ disabled: "disabled" });
            },
            success: function (data) {
                if (data == "ok") {
                    alert("恭喜你,同步成功~");
                } else {
                    alert("同步失败!");
                }
            },
            complete: function () {
                $("#btnSync").removeAttr("disabled");
            },
            error: function (data) {
                console.info("error: " + data.responseText);
            }
        });
    });

    $("#btnClear").click(function () {
        $.ajax({
            type: "post",
            url: "/Home/SelectDistinct",
            beforeSend: function () {
                // 禁用按钮防止重复提交
                $("#btnClear").attr({ disabled: "disabled" });
            },
            success: function (data) {
                if (data == "ok") {
                    alert("恭喜你,清理成功~");
                } else {
                    alert("清理失败!");
                }
            },
            complete: function () {
                $("#btnClear").removeAttr("disabled");
            },
            error: function (data) {
                console.info("error: " + data.responseText);
            }
        });
    });
</script>

后台代码

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Web;
using System.Web.Mvc;
using 数据同步.Models;

namespace 数据同步.Controllers
{
    public class HomeController : Controller
    {
        /// <summary>
        /// 声明:使用本程序时候,要注意【原始表】与【要同步数据库表】结构一致,才能执行成功。并且【所有表的主键】相同才能同步成功。 
        /// </summary>     
  
        public ActionResult Index()
        {
            return View();
        }
        /// <summary>
        /// 转移数据
        /// </summary>    
        [HttpPost]
        public ActionResult CopyData()
        {
            string result = "ok";
            #region 简单调用一张表=======
            //DBUtility db = new DBUtility();
            //try { 
            //db.BulkCopyTo("Users", "UserID");
            //}
            //catch(Exception ex)
            //{
            //    result = "no";
            //}
            #endregion
            #region 自动循环调用所有表=====
            try
            {
                string conStr = "DB data is syncing!";
                DBUtility db = new DBUtility();
                //DataSet可以比作内存中的数据库,DataTable比作内存中的数据表。DataSet可以存储多个DataTable
                DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = ‘U‘");//求出所有表名,DataSet表示一个存放数据库中的数据缓存
                DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合
                foreach (DataRow dr in drc)
                {
                    string tableName = dr[0].ToString();//获取数据表
                    conStr = conStr + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine;
                    DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID(‘dbo." + tableName + "‘)");//求出表里面的属性集合,DataSet表示一个存放数据库中的数据缓存
                    DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合
                    if (drc2 != null)
                    {
                        string primaryKeyName = drc2[0]["name"].ToString();//获取主键名称

                        db.BulkCopyTo(tableName, primaryKeyName);

                        conStr = conStr + "Done sync data for table:" + tableName + Environment.NewLine;
                    }
                }
            }
            catch (Exception exc)
            {
                result = "no";
                throw exc;
            }
            #endregion
            return Content(result);
        }

        /// <summary>
        /// 同步数据
        /// </summary>
        [HttpPost]
        public ActionResult SyncClick3()
        {
            string result = "ok";
            #region 简单调用一张表===
            //DBUtility db = new DBUtility();
            //try { 
            //List<string> list1 = new List<string>();
            //list1.AddRange(new String[] { "UserID", "UserName","UserAge" });
            //List<string> list2 = new List<string>();
            //list2.Add("UserID");
            //db.BulkUpdateTo("Users", list1, list2);
            //}catch(Exception ex)
            //{
            //    result = "no";
            //}
            #endregion
            #region 自动循环调用所有表===
            try
            {
                string str = "DB data is syncing!";
                DBUtility db = new DBUtility();
                //查询数据库所有表
                DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = ‘U‘");
                DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合
                DateTime start = DateTime.Now;
                foreach (DataRow dr in drc)
                {
                    string tableName = dr[0].ToString();//获取一个表名
                    str = str + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine;
                    DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID(‘dbo." + tableName + "‘)");//获取表的相关属性信息
                    DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合
                    string primaryKeyName = drc2[0]["name"].ToString();//获取主键名
                    List<string> columns = new List<string>();
                    if (tableName == "Customers")//如果table名字为Customers,则执行下列操作
                    {
                        columns.Add("CustomerName");
                        columns.Add("CustomerId");
                        columns.Add("IsNewData");
                    }
                    else//将属性名称添加进集合
                    {
                        foreach (DataRow dr2 in drc2)
                        {
                            columns.Add(dr2["name"].ToString());//将此表的所有属性写进list集合
                        }
                    }
                    List<string> ignoreUpdateColumns = new List<string>();
                    ignoreUpdateColumns.Add("ID");//添加更新时要忽视的字段
                    db.BulkUpdateTo(tableName, columns, ignoreUpdateColumns);

                    str = str + "Done sync data for table:" + tableName + Environment.NewLine;
                }
                DateTime end = DateTime.Now;
                str = str + "cost total seconds:" + (end - start).TotalSeconds.ToString() + Environment.NewLine;

            }
            catch (Exception ex)
            {
                result = "no";
            }
            #endregion
            return Content(result);

        }

        /// <summary>
        /// 清洗数据
        /// </summary>      
        public ActionResult SelectDistinct()
        {
            string result = "ok";
            DBUtility d = new DBUtility();
            bool b=d.CleartData();
            if (!b)
            {
                result = "no";
            }
            return Content(result);
        }
    }
}

DBUtility数据转移清洗同步帮助类

public class DBUtility
    {
        private string Server;
        private string Database;
        private string Uid;
        private string Password;
        public string connectionStr;
        private SqlConnection mysqlConn;
        private string targetServer;
        private string targetDatabase;
        private string targetUid;
        private string targetPassword;
        private string targetConnectionStr;
        private SqlConnection targetConn;
        public void EnsureConnectionIsOpen()
        {
            if (mySqlConn == null)
            {
                mySqlConn = new SqlConnection(this.connectionStr);
                mySqlConn.Open();
            }
            else if (mySqlConn.State == System.Data.ConnectionState.Closed)
            {
                mySqlConn.Open();
            }
            if (targetConn==null)
            {
                targetConn = new SqlConnection(this.targetConnectionStr);
                targetConn.Open();
            }
            else if(targetConn.State == System.Data.ConnectionState.Closed)
            {
                targetConn.Open();
            }
        }
        public DBUtility()
        {
            this.Server = ConfigurationManager.AppSettings["sourceServer"].ToString();
            this.Database = ConfigurationManager.AppSettings["sourceDatabase"].ToString();
            this.Uid = ConfigurationManager.AppSettings["sourceUid"].ToString();
            this.Password = ConfigurationManager.AppSettings["sourcePassword"].ToString();
            this.connectionStr = "Server=" + this.Server + ";Database=" + this.Database + ";User Id=" + this.Uid + ";Password=" + this.Password;
            this.targetServer = ConfigurationManager.AppSettings["targetServer"].ToString();
            this.targetDatabase = ConfigurationManager.AppSettings["targetDatabase"].ToString();
            this.targetUid = ConfigurationManager.AppSettings["targetUid"].ToString();
            this.targetPassword = ConfigurationManager.AppSettings["targetPassword"].ToString();
            this.targetConnectionStr = "Server=" + this.targetServer + ";Database=" + this.targetDatabase + ";User Id=" + this.targetUid + ";Password=" + this.targetPassword;
        }

        public int ExecuteNonQuery(string sqlStr,SqlConnection connStr)
        {
            this.EnsureConnectionIsOpen();
            SqlCommand cmd = new SqlCommand(sqlStr, connStr);
            cmd.CommandType = CommandType.Text;
            return cmd.ExecuteNonQuery();
        }
        public object ExecuteScalar(string sqlStr)
        {
            this.EnsureConnectionIsOpen();
            SqlCommand cmd = new SqlCommand(sqlStr, mySqlConn);
            cmd.CommandType = CommandType.Text;
            return cmd.ExecuteScalar();
        }
        public DataSet ExecuteDS(string sqlStr)
        {
            DataSet ds = new DataSet();
            this.EnsureConnectionIsOpen();
            SqlDataAdapter sda = new SqlDataAdapter(sqlStr, mySqlConn);
            sda.Fill(ds);
            return ds;
        }
        #region 转移数据
        /// <summary>
        /// 转移数据
        /// </summary>      
        public void BulkCopyTo(string TableName, string PrimaryKeyName)
        {
            this.EnsureConnectionIsOpen();
            SqlConnection destinationConnector = new SqlConnection(targetConnectionStr);
            SqlCommand cmd = new SqlCommand("SELECT * FROM " + TableName, mySqlConn);//查询本地数据                    
            destinationConnector.Open();
            SqlDataReader readerSource = cmd.ExecuteReader();//执行查询
            bool isSourceContainsData = false;//是否有数据
            string whereClause = " where ";
            while (readerSource.Read()) //读取源数据
            {
                isSourceContainsData = true;
                whereClause += " " + PrimaryKeyName + "=" + readerSource[PrimaryKeyName].ToString() + " or ";
            }
            whereClause = whereClause.Remove(whereClause.Length - " or ".Length, " or ".Length);
            readerSource.Close();

            whereClause = isSourceContainsData ? whereClause : string.Empty;
            // Select data from Products table
            cmd = new SqlCommand("SELECT * FROM " + TableName + whereClause, mySqlConn);//根据条件查询本地数据库
            // Execute reader
            SqlDataReader reader = cmd.ExecuteReader();//执行查询       
            // Create SqlBulkCopy
            SqlBulkCopy bulkData = new SqlBulkCopy(destinationConnector);
            // Set destination table name
            bulkData.DestinationTableName = TableName;
            // Write data
            bulkData.WriteToServer(reader);//将本地查询的数据写入远程
            // Close objects
            bulkData.Close();
            destinationConnector.Close();
            mySqlConn.Close();
        }
        #endregion

        #region 更新同步数据
        /// <summary>
        /// 更新同步数据
        /// </summary>      
        public void BulkUpdateTo(string targetTableName, List<string> columns, List<string> ignoreUpdateColumns)
        {
            string primaryKeyName = columns[0];//从属性里面获取主键名称           
            SqlConnection destinationConnector = new SqlConnection(targetConnectionStr);

            SqlCommand cmd = new SqlCommand("SELECT * FROM " + targetTableName, destinationConnector);//查询远程数据
            this.EnsureConnectionIsOpen();
            destinationConnector.Open();

            Dictionary<int, string> Index_PrimaryKeyValue = new Dictionary<int, string>();

            SqlDataReader readerSource = cmd.ExecuteReader();//执行查询
            Dictionary<string, Dictionary<string, string>> recordsDest = new Dictionary<string, Dictionary<string, string>>();
            int i = 0;
            while (readerSource.Read())//读取远程查询结果
            {
                Index_PrimaryKeyValue.Add(i, readerSource[primaryKeyName].ToString());//获取主键值
                string recordIndex = Index_PrimaryKeyValue[i];
                recordsDest[recordIndex] = new Dictionary<string, string>();
                foreach (string keyName in columns)
                {
                    recordsDest[recordIndex].Add(keyName, readerSource[keyName].ToString());
                }
                i++;
            }          
            cmd = new SqlCommand("SELECT * FROM " + targetTableName, mySqlConn);//查询本地数据           
            SqlDataReader reader = cmd.ExecuteReader();//执行查询
            Dictionary<string, Dictionary<string, string>> recordsSource = new Dictionary<string, Dictionary<string, string>>();

            Dictionary<int, string> Index_PrimaryKeyValue2 = new Dictionary<int, string>();
            int j = 0;
            while (reader.Read())//读取本地数据
            {
                Index_PrimaryKeyValue2.Add(j, reader[primaryKeyName].ToString());
                string recordIndex = Index_PrimaryKeyValue2[j];
                recordsSource[recordIndex] = new Dictionary<string, string>();
                foreach (string keyName in columns)
                {
                    recordsSource[recordIndex].Add(keyName, reader[keyName].ToString());
                }
                j++;
            }
            reader.Close();
            readerSource.Close();

            foreach (var record in recordsSource)
            {
                string setScripts = string.Empty;
                int setScriptsIndex = 0;
                string primaryKeyValue = record.Key;
                foreach (string keyName in columns)
                {
                    if (!ignoreUpdateColumns.Contains(keyName))
                    {
                        if (recordsDest == null)
                        {
                            
                        }
                        else
                        {
                            if (setScriptsIndex == 0)
                            {
                                setScripts += keyName + "=‘" + recordsSource[primaryKeyValue][keyName] + "";
                            }
                            else
                            {
                                setScripts += "," + keyName + "=‘" + recordsSource[primaryKeyValue][keyName] + "";
                            }
                            setScriptsIndex++;
                        }
                    }
                }
                //update source to dest  
                if (setScriptsIndex > 0)
                {
                    cmd = new SqlCommand("Update " + targetTableName + " set " + setScripts + " where " + primaryKeyName + "=‘" + recordsSource[primaryKeyValue][primaryKeyName] + "", destinationConnector);//根据主键进行更新
                    cmd.ExecuteNonQuery();
                }
            }
            destinationConnector.Close();
            mySqlConn.Close();
        }
        #endregion

        #region 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表
        /// <summary>
        /// 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表
        /// </summary>    
        public bool CleartData()
        {
            this.EnsureConnectionIsOpen();
            string sql = @"select ROW_NUMBER() OVER(PARTITION BY Name ORDER BY ID) NameSource,ID into #1 from Test2
                           delete Test2 where ID in(select ID from #1 where NameSource>1)
                           drop table #1
                            ";
            int n = ExecuteNonQuery(sql, targetConn);
            return n > 0;
        }
        #endregion

        private bool ColumnEqual(object A, object B)
        {
            if (A == DBNull.Value && B == DBNull.Value) //     两个都是   DBNull.Value
                return true;
            if (A == DBNull.Value || B == DBNull.Value) //     只有一个是   DBNull.Value
                return false;
            return (A.Equals(B));//   正常比较
        }

        public void Dispose()
        {
            if (mySqlConn != null)
                mySqlConn.Close();
        }
    }

 

以上是关于转移,清洗,同步数据的主要内容,如果未能解决你的问题,请参考以下文章

使用函数计算对表格存储中数据做简单清洗

[工作积累] UE4 并行渲染的同步 - Sync between FParallelCommandListSet & FRHICommandListImmediate calls(代码片段

datax转移数据库可以返回进度吗

从同步自动故障转移 AlwaysOn 可用性组 AAG 传送日志

妙啊,这8个 Python 数据清洗代码好用到爆

妙不可言,这8个 Python 数据清洗代码好用到飞