转移,清洗,同步数据
Posted 一明
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了转移,清洗,同步数据相关的知识,希望对你有一定的参考价值。
最近看了看公司的导入,清洗,同步数据。想自己也实现下
首先用SqlBulkCopy批量导入,然后用Partition by对要删除的数据进行分组,然后删除ID>1的数据。同步数据就是对源数据进行查询,然后批量更新目标数据。
我用MVC实现了下,代码实现如下:
前台代码
@{ Layout = null; } <html xmlns="http://www.w3.org/1999/xhtml"> <head> <title>数据同步</title> </head> <body> <input type="button" id="btnCopy" value="复制数据"/> <input type="button" id="btnSync" value="同步已有变化的数据"/> <input type="button" id="btnClear" value="清洗数据"/> </body> </html> <script src="~/Scripts/jquery-1.10.2.min.js"></script> <script> $("#btnCopy").click(function () { $.ajax({ type: "post", url: "/Home/CopyData", beforeSend: function () { // 禁用按钮防止重复提交 $("#btnCopy").attr({ disabled: "disabled" }); }, success: function (data) { if (data == "ok") { alert("恭喜你,复制成功~"); } else { alert("复制失败!"); } }, complete: function () { $("#btnCopy").removeAttr("disabled"); }, error: function (data) { console.info("error: " + data.responseText); } }); }); $("#btnSync").click(function () { $.ajax({ type: "post", url: "/Home/SyncClick3", beforeSend: function () { // 禁用按钮防止重复提交 $("#btnSync").attr({ disabled: "disabled" }); }, success: function (data) { if (data == "ok") { alert("恭喜你,同步成功~"); } else { alert("同步失败!"); } }, complete: function () { $("#btnSync").removeAttr("disabled"); }, error: function (data) { console.info("error: " + data.responseText); } }); }); $("#btnClear").click(function () { $.ajax({ type: "post", url: "/Home/SelectDistinct", beforeSend: function () { // 禁用按钮防止重复提交 $("#btnClear").attr({ disabled: "disabled" }); }, success: function (data) { if (data == "ok") { alert("恭喜你,清理成功~"); } else { alert("清理失败!"); } }, complete: function () { $("#btnClear").removeAttr("disabled"); }, error: function (data) { console.info("error: " + data.responseText); } }); }); </script>
后台代码
using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Linq; using System.Web; using System.Web.Mvc; using 数据同步.Models; namespace 数据同步.Controllers { public class HomeController : Controller { /// <summary> /// 声明:使用本程序时候,要注意【原始表】与【要同步数据库表】结构一致,才能执行成功。并且【所有表的主键】相同才能同步成功。 /// </summary> public ActionResult Index() { return View(); } /// <summary> /// 转移数据 /// </summary> [HttpPost] public ActionResult CopyData() { string result = "ok"; #region 简单调用一张表======= //DBUtility db = new DBUtility(); //try { //db.BulkCopyTo("Users", "UserID"); //} //catch(Exception ex) //{ // result = "no"; //} #endregion #region 自动循环调用所有表===== try { string conStr = "DB data is syncing!"; DBUtility db = new DBUtility(); //DataSet可以比作内存中的数据库,DataTable比作内存中的数据表。DataSet可以存储多个DataTable DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = ‘U‘");//求出所有表名,DataSet表示一个存放数据库中的数据缓存 DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合 foreach (DataRow dr in drc) { string tableName = dr[0].ToString();//获取数据表 conStr = conStr + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine; DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID(‘dbo." + tableName + "‘)");//求出表里面的属性集合,DataSet表示一个存放数据库中的数据缓存 DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合 if (drc2 != null) { string primaryKeyName = drc2[0]["name"].ToString();//获取主键名称 db.BulkCopyTo(tableName, primaryKeyName); conStr = conStr + "Done sync data for table:" + tableName + Environment.NewLine; } } } catch (Exception exc) { result = "no"; throw exc; } #endregion return Content(result); } /// <summary> /// 同步数据 /// </summary> [HttpPost] public ActionResult SyncClick3() { string result = "ok"; #region 简单调用一张表=== //DBUtility db = new DBUtility(); //try { //List<string> list1 = new List<string>(); //list1.AddRange(new String[] { "UserID", "UserName","UserAge" }); //List<string> list2 = new List<string>(); //list2.Add("UserID"); //db.BulkUpdateTo("Users", list1, list2); //}catch(Exception ex) //{ // result = "no"; //} #endregion #region 自动循环调用所有表=== try { string str = "DB data is syncing!"; DBUtility db = new DBUtility(); //查询数据库所有表 DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = ‘U‘"); DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合 DateTime start = DateTime.Now; foreach (DataRow dr in drc) { string tableName = dr[0].ToString();//获取一个表名 str = str + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine; DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID(‘dbo." + tableName + "‘)");//获取表的相关属性信息 DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合 string primaryKeyName = drc2[0]["name"].ToString();//获取主键名 List<string> columns = new List<string>(); if (tableName == "Customers")//如果table名字为Customers,则执行下列操作 { columns.Add("CustomerName"); columns.Add("CustomerId"); columns.Add("IsNewData"); } else//将属性名称添加进集合 { foreach (DataRow dr2 in drc2) { columns.Add(dr2["name"].ToString());//将此表的所有属性写进list集合 } } List<string> ignoreUpdateColumns = new List<string>(); ignoreUpdateColumns.Add("ID");//添加更新时要忽视的字段 db.BulkUpdateTo(tableName, columns, ignoreUpdateColumns); str = str + "Done sync data for table:" + tableName + Environment.NewLine; } DateTime end = DateTime.Now; str = str + "cost total seconds:" + (end - start).TotalSeconds.ToString() + Environment.NewLine; } catch (Exception ex) { result = "no"; } #endregion return Content(result); } /// <summary> /// 清洗数据 /// </summary> public ActionResult SelectDistinct() { string result = "ok"; DBUtility d = new DBUtility(); bool b=d.CleartData(); if (!b) { result = "no"; } return Content(result); } } }
DBUtility数据转移清洗同步帮助类
public class DBUtility { private string Server; private string Database; private string Uid; private string Password; public string connectionStr; private SqlConnection mysqlConn; private string targetServer; private string targetDatabase; private string targetUid; private string targetPassword; private string targetConnectionStr; private SqlConnection targetConn; public void EnsureConnectionIsOpen() { if (mySqlConn == null) { mySqlConn = new SqlConnection(this.connectionStr); mySqlConn.Open(); } else if (mySqlConn.State == System.Data.ConnectionState.Closed) { mySqlConn.Open(); } if (targetConn==null) { targetConn = new SqlConnection(this.targetConnectionStr); targetConn.Open(); } else if(targetConn.State == System.Data.ConnectionState.Closed) { targetConn.Open(); } } public DBUtility() { this.Server = ConfigurationManager.AppSettings["sourceServer"].ToString(); this.Database = ConfigurationManager.AppSettings["sourceDatabase"].ToString(); this.Uid = ConfigurationManager.AppSettings["sourceUid"].ToString(); this.Password = ConfigurationManager.AppSettings["sourcePassword"].ToString(); this.connectionStr = "Server=" + this.Server + ";Database=" + this.Database + ";User Id=" + this.Uid + ";Password=" + this.Password; this.targetServer = ConfigurationManager.AppSettings["targetServer"].ToString(); this.targetDatabase = ConfigurationManager.AppSettings["targetDatabase"].ToString(); this.targetUid = ConfigurationManager.AppSettings["targetUid"].ToString(); this.targetPassword = ConfigurationManager.AppSettings["targetPassword"].ToString(); this.targetConnectionStr = "Server=" + this.targetServer + ";Database=" + this.targetDatabase + ";User Id=" + this.targetUid + ";Password=" + this.targetPassword; } public int ExecuteNonQuery(string sqlStr,SqlConnection connStr) { this.EnsureConnectionIsOpen(); SqlCommand cmd = new SqlCommand(sqlStr, connStr); cmd.CommandType = CommandType.Text; return cmd.ExecuteNonQuery(); } public object ExecuteScalar(string sqlStr) { this.EnsureConnectionIsOpen(); SqlCommand cmd = new SqlCommand(sqlStr, mySqlConn); cmd.CommandType = CommandType.Text; return cmd.ExecuteScalar(); } public DataSet ExecuteDS(string sqlStr) { DataSet ds = new DataSet(); this.EnsureConnectionIsOpen(); SqlDataAdapter sda = new SqlDataAdapter(sqlStr, mySqlConn); sda.Fill(ds); return ds; } #region 转移数据 /// <summary> /// 转移数据 /// </summary> public void BulkCopyTo(string TableName, string PrimaryKeyName) { this.EnsureConnectionIsOpen(); SqlConnection destinationConnector = new SqlConnection(targetConnectionStr); SqlCommand cmd = new SqlCommand("SELECT * FROM " + TableName, mySqlConn);//查询本地数据 destinationConnector.Open(); SqlDataReader readerSource = cmd.ExecuteReader();//执行查询 bool isSourceContainsData = false;//是否有数据 string whereClause = " where "; while (readerSource.Read()) //读取源数据 { isSourceContainsData = true; whereClause += " " + PrimaryKeyName + "=" + readerSource[PrimaryKeyName].ToString() + " or "; } whereClause = whereClause.Remove(whereClause.Length - " or ".Length, " or ".Length); readerSource.Close(); whereClause = isSourceContainsData ? whereClause : string.Empty; // Select data from Products table cmd = new SqlCommand("SELECT * FROM " + TableName + whereClause, mySqlConn);//根据条件查询本地数据库 // Execute reader SqlDataReader reader = cmd.ExecuteReader();//执行查询 // Create SqlBulkCopy SqlBulkCopy bulkData = new SqlBulkCopy(destinationConnector); // Set destination table name bulkData.DestinationTableName = TableName; // Write data bulkData.WriteToServer(reader);//将本地查询的数据写入远程 // Close objects bulkData.Close(); destinationConnector.Close(); mySqlConn.Close(); } #endregion #region 更新同步数据 /// <summary> /// 更新同步数据 /// </summary> public void BulkUpdateTo(string targetTableName, List<string> columns, List<string> ignoreUpdateColumns) { string primaryKeyName = columns[0];//从属性里面获取主键名称 SqlConnection destinationConnector = new SqlConnection(targetConnectionStr); SqlCommand cmd = new SqlCommand("SELECT * FROM " + targetTableName, destinationConnector);//查询远程数据 this.EnsureConnectionIsOpen(); destinationConnector.Open(); Dictionary<int, string> Index_PrimaryKeyValue = new Dictionary<int, string>(); SqlDataReader readerSource = cmd.ExecuteReader();//执行查询 Dictionary<string, Dictionary<string, string>> recordsDest = new Dictionary<string, Dictionary<string, string>>(); int i = 0; while (readerSource.Read())//读取远程查询结果 { Index_PrimaryKeyValue.Add(i, readerSource[primaryKeyName].ToString());//获取主键值 string recordIndex = Index_PrimaryKeyValue[i]; recordsDest[recordIndex] = new Dictionary<string, string>(); foreach (string keyName in columns) { recordsDest[recordIndex].Add(keyName, readerSource[keyName].ToString()); } i++; } cmd = new SqlCommand("SELECT * FROM " + targetTableName, mySqlConn);//查询本地数据 SqlDataReader reader = cmd.ExecuteReader();//执行查询 Dictionary<string, Dictionary<string, string>> recordsSource = new Dictionary<string, Dictionary<string, string>>(); Dictionary<int, string> Index_PrimaryKeyValue2 = new Dictionary<int, string>(); int j = 0; while (reader.Read())//读取本地数据 { Index_PrimaryKeyValue2.Add(j, reader[primaryKeyName].ToString()); string recordIndex = Index_PrimaryKeyValue2[j]; recordsSource[recordIndex] = new Dictionary<string, string>(); foreach (string keyName in columns) { recordsSource[recordIndex].Add(keyName, reader[keyName].ToString()); } j++; } reader.Close(); readerSource.Close(); foreach (var record in recordsSource) { string setScripts = string.Empty; int setScriptsIndex = 0; string primaryKeyValue = record.Key; foreach (string keyName in columns) { if (!ignoreUpdateColumns.Contains(keyName)) { if (recordsDest == null) { } else { if (setScriptsIndex == 0) { setScripts += keyName + "=‘" + recordsSource[primaryKeyValue][keyName] + "‘ "; } else { setScripts += "," + keyName + "=‘" + recordsSource[primaryKeyValue][keyName] + "‘ "; } setScriptsIndex++; } } } //update source to dest if (setScriptsIndex > 0) { cmd = new SqlCommand("Update " + targetTableName + " set " + setScripts + " where " + primaryKeyName + "=‘" + recordsSource[primaryKeyValue][primaryKeyName] + "‘", destinationConnector);//根据主键进行更新 cmd.ExecuteNonQuery(); } } destinationConnector.Close(); mySqlConn.Close(); } #endregion #region 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表 /// <summary> /// 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表 /// </summary> public bool CleartData() { this.EnsureConnectionIsOpen(); string sql = @"select ROW_NUMBER() OVER(PARTITION BY Name ORDER BY ID) NameSource,ID into #1 from Test2 delete Test2 where ID in(select ID from #1 where NameSource>1) drop table #1 "; int n = ExecuteNonQuery(sql, targetConn); return n > 0; } #endregion private bool ColumnEqual(object A, object B) { if (A == DBNull.Value && B == DBNull.Value) // 两个都是 DBNull.Value return true; if (A == DBNull.Value || B == DBNull.Value) // 只有一个是 DBNull.Value return false; return (A.Equals(B));// 正常比较 } public void Dispose() { if (mySqlConn != null) mySqlConn.Close(); } }
以上是关于转移,清洗,同步数据的主要内容,如果未能解决你的问题,请参考以下文章
[工作积累] UE4 并行渲染的同步 - Sync between FParallelCommandListSet & FRHICommandListImmediate calls(代码片段