闲来无事,编写一个数据迁移小工具
Posted NIK
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了闲来无事,编写一个数据迁移小工具相关的知识,希望对你有一定的参考价值。
一、前言
生命不息,折腾不止。近期公司有数据迁移的计划,从Sqlserver迁移到mysql,虽说网上有很多数据迁移方案,但闲着也是闲着,就自己整一个,权当做是练练手了
二、解决思路
整个迁移过程类似于ETL,将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端。读取并转换sqlserver库数据,将数据解析为csv文件,载入文件到mysql。流程如下:
- 抽取、转换
此过程主要是处理源数据库与目标数据库表字段的映射关系,为了保证程序的通用性,通过配置文件映射字段关系,解析配置文件并生成数据库脚本 - 加载
数据迁移的时候最好不要用INSERT语句插入批量插入,这样数据量稍稍大一点就很慢。sqlserver可通过SqlBulkCopy将DataTable对象快速插入到数据库,然后mysql并没有这东西,查阅资料后发现mysql可通过MySqlBulkLoader将csv文件快速导入到数据库。经测试迁移10K条数据MySqlBulkLoader可在1S内处理完,速度还是相当不错的
三、实现
- 配置文件
db_caption.xml(数据库),主要用来存储表描述文件名,若待迁移的表不存在外键关系即迁移时不用考虑先后顺序,此配置文件可以不要。其中maxClients参数指的是异步迁移时,最大并发数。
<?xml version="1.0" encoding="utf-8" ?> <root> <maxClients value="3"></maxClients> <tables> <table filename="t_drawtemplate.xml" caption="抽奖模板"></table> <table filename="t_drawprize.xml" caption="抽奖奖品"></table> <table filename="t_drawrecord.xml" caption="抽奖记录"></table> <table filename="t_drawwinner.xml" caption="中奖记录"></table> </tables> </root>
t_table.xml(表),主要用来描述待迁移表信息及字段描述
<?xml version="1.0" encoding="utf-8" ?> <root> <![CDATA[抽奖记录]]> <!--是否分页,默认不分页就好啦,false_不分页--> <isPaging value="true"></isPaging> <pageSize value="10000"></pageSize> <!--mssql数据库表主键--> <primaryKey value="DrawRecordId"></primaryKey> <!--mssql数据库表名--> <msTable value="DrawRecord"></msTable> <!--mysql数据库表名--> <myTable value="t_drawrecord"></myTable> <!--筛选条件,无特殊情况为空即可--> <filter value="1=1"></filter> <!--字段映射--> <fields> <field msName ="DrawRecordId" myName="id"></field> <field msName ="FK_MemberId" myName="user_id"></field> <field msName ="Remark" myName="remark"></field> <field msName ="DataStatus" myName="data_status"></field> <field msName ="DrawTime" myName="drawTime"></field> <!--需要调整字段示例--> <field msName ="CASE WHEN DrawWinnerId >0 THEN DrawWinnerId END" myName="drawwinner_id"></field> </fields> <!--迁移完成后,数据修复脚本,主要用来修正日期类型为0000-00-00 00:00:00问题--> <fixSql></fixSql> </root>
- 创建xml文件映射对象并重写ToString方法,将对象解析为sql
db_caption.xml映射对象
1 /// <summary> 2 /// 数据库描述类(db_caption) 3 /// </summary> 4 internal class DBCaptionModel 5 { 6 public DBCaptionModel() 7 { 8 this.Tables = new List<TableModel>(); 9 } 10 11 /// <summary> 12 /// 最大连接数 13 /// </summary> 14 public int MaxClients { get; set; } 15 16 /// <summary> 17 /// 表集合 18 /// </summary> 19 public IList<TableModel> Tables { get; private set; } 20 } 21 22 internal class TableModel 23 { 24 /// <summary> 25 /// 表xml文件名 26 /// </summary> 27 public string FileName { get; set; } 28 29 /// <summary> 30 /// 描述 31 /// </summary> 32 public string Caption { get; set; } 33 34 /// <summary> 35 /// 是否已同步 36 /// </summary> 37 public bool IsSync { get; set; } 38 }
1 /// <summary> 2 /// 表描述类 3 /// </summary> 4 internal class TableCaptionModel 5 { 6 public TableCaptionModel() 7 { 8 this.Fields = new List<FieldModel>(); 9 } 10 11 /// <summary> 12 /// 是否分页 13 /// </summary> 14 public bool IsPaging { get; set; } 15 16 /// <summary> 17 /// 分页大小 18 /// </summary> 19 public int PageSize { get; set; } 20 21 /// <summary> 22 /// 源数据表表名 23 /// </summary> 24 public string SourceTableName { get; set; } 25 26 /// <summary> 27 /// 目标数据表表名 28 /// </summary> 29 public string TargetTableName { get; set; } 30 31 /// <summary> 32 /// 源数据表主键 33 /// </summary> 34 public string PrimaryKey { get; set; } 35 36 /// <summary> 37 /// 过滤条件 38 /// </summary> 39 public string Filter { get; set; } 40 41 /// <summary> 42 /// 字段集合 43 /// </summary> 44 public List<FieldModel> Fields { get; set; } 45 46 /// <summary> 47 /// 数据迁移完成后,数据修复脚本 48 /// </summary> 49 public string FixSql { get; set; } 50 51 /// <summary> 52 /// ToString 53 /// </summary> 54 /// <returns>sql</returns> 55 public override string ToString() 56 { 57 string sql = GetBaseSql(); 58 string filter = GetFilterSql(); 59 if (!string.IsNullOrWhiteSpace(filter)) 60 { 61 sql += " WHERE " + filter; 62 } 63 64 sql += " ORDER BY " + this.PrimaryKey; 65 return sql; 66 } 67 68 /// <summary> 69 /// 获取基础查询Sql 70 /// </summary> 71 /// <![CDATA[SELECT SourceField AS TargetField,...... FROM table]]> 72 /// <returns></returns> 73 private string GetBaseSql() 74 { 75 StringBuilder sb = new StringBuilder("SELECT"); 76 77 foreach (var item in this.Fields) 78 { 79 sb.AppendFormat(" {0},", item.ToString()); 80 } 81 82 sb = sb.Remove(sb.Length - 1, 1); 83 84 sb.Append(" FROM "); 85 sb.Append(this.SourceTableName); 86 return sb.ToString(); 87 } 88 89 /// <summary> 90 /// 获取sql查询条件 91 /// </summary> 92 /// <![CDATA[filter || PrimaryKey NOT IN (SELECT PrimaryKey FORM table WHERE filter)]]> 93 /// <returns></returns> 94 private string GetFilterSql() 95 { 96 if (!this.IsPaging) 97 { 98 return this.Filter; 99 } 100 101 StringBuilder sb = new StringBuilder(); 102 sb.AppendFormat("SELECT ROW_NUMBER() OVER(ORDER BY {0}) RowNo,{0} FROM {1}", this.PrimaryKey, this.SourceTableName); 103 104 if (!string.IsNullOrWhiteSpace(this.Filter)) 105 { 106 sb.Append(" WHERE " + this.Filter); 107 } 108 109 sb.Insert(0, string.Format("SELECT {0} FROM (", this.PrimaryKey)); 110 sb.AppendFormat(") T WHERE RowNo BETWEEN @StartIndex AND @EndIndex"); 111 112 return string.Format("{0} IN ({1})", this.PrimaryKey, sb.ToString()); 113 } 114 } 115 116 /// <summary> 117 /// 字段类 118 /// </summary> 119 internal class FieldModel 120 { 121 /// <summary> 122 /// 源字段名 123 /// </summary> 124 public string SourceFieldName { get; set; } 125 126 /// <summary> 127 /// 目标字段名 128 /// </summary> 129 public string TargetFieldName { get; set; } 130 131 /// <summary> 132 /// ToString 133 /// </summary> 134 /// <returns>\'SourceFieldName\' AS \'TargetFieldName\'" </returns> 135 public override string ToString() 136 { 137 if (this.SourceFieldName.IndexOfAny(new char[] { \' \', \'(\' }) < 0) 138 { 139 //非表达式 140 return string.Format("[{0}] AS \'{1}\'", SourceFieldName, TargetFieldName); 141 } 142 else 143 { 144 return string.Format("{0} AS \'{1}\'", SourceFieldName, TargetFieldName); 145 } 146 } 147 }
- 解析XML文件
XML解析可通过XmlSerializer直接反序列化为对象,此处只是为了温习XML解析方式,故采用此方法
1 /// <summary> 2 /// 载入数据库描述xml 3 /// </summary> 4 /// <returns></returns> 5 private static DBCaptionModel LoadDBCaption() 6 { 7 DBCaptionModel model = new DBCaptionModel(); 8 9 XmlDocument doc = new XmlDocument(); 10 doc.Load(CONN_XML_PATH + "db_caption.xml"); 11 12 XmlNode root = doc.SelectSingleNode("root"); 13 //获取最大连接数 14 model.MaxClients = root.SelectSingleNode("maxClients").GetAttribute<int>("value"); 15 16 //获取表描述 17 XmlNodeList tables = root.SelectSingleNode("tables").SelectNodes("table"); 18 foreach (XmlNode node in tables) 19 { 20 model.Tables.Add(new TableModel 21 { 22 FileName = node.GetAttribute("filename"), 23 Caption = node.GetAttribute("caption") 24 }); 25 } 26 27 return model; 28 } 29 30 /// <summary> 31 /// 载入表描述xml 32 /// </summary> 33 /// <param name="fileName">表描叙xml文件名</param> 34 /// <returns></returns> 35 private static TableCaptionModel LoadTableCaption(string fileName) 36 { 37 XmlDocument doc = new XmlDocument(); 38 doc.Load(CONN_XML_PATH + fileName); 39 40 TableCaptionModel model = new TableCaptionModel(); 41 42 XmlNode root = doc.SelectSingleNode("root"); 43 model.IsPaging = root.SelectSingleNode("isPaging").GetAttribute<bool>("value"); 44 if (model.IsPaging) 45 { 46 model.PageSize = root.SelectSingleNode("pageSize").GetAttribute<int>("value"); 47 } 48 model.SourceTableName = root.SelectSingleNode("msTable").GetAttribute("value"); 49 model.TargetTableName = root.SelectSingleNode("myTable").GetAttribute("value"); 50 model.PrimaryKey = root.SelectSingleNode("primaryKey").GetAttribute("value"); 51 model.FixSql = root.SelectSingleNode("fixSql").GetAttribute("value"); 52 53 XmlNodeList fields = root.SelectSingleNode("fields").SelectNodes("field"); 54 55 foreach (XmlNode field in fields) 56 { 57 model.Fields.Add(new FieldModel 58 { 59 SourceFieldName = field.GetAttribute("msName"), 60 TargetFieldName = field.GetAttribute("myName") 61 }); 62 } 63 64 return model; 65 }
Node.GetAttribute扩展方法,简化读取Node属性代码
1 public static class XmlNodeExtension 2 { 3 /// <summary> 4 /// 获取节点属性 5 /// </summary> 6 /// <param name="node">当前节点</param> 7 /// <param name="attrName">属性名称</param> 8 /// <returns></returns> 9 public static string GetAttribute(this XmlNode node, string attrName) 10 { 11 if (node == null) 12 { 13 return null; 14 } 15 return ((XmlElement)node).GetAttribute(attrName); 16 } 17 18 /// <summary> 19 /// 获取节点属性 20 /// </summary> 21 /// <param name="node">当前节点</param> 22 /// <param name="attrName">属性名称</param> 23 /// <returns></returns> 24 public static T GetAttribute<T>(this XmlNode node, string attrName) where T : struct 25 { 26
以上是关于闲来无事,编写一个数据迁移小工具的主要内容,如果未能解决你的问题,请参考以下文章