闲来无事,编写一个数据迁移小工具

Posted NIK

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了闲来无事,编写一个数据迁移小工具相关的知识,希望对你有一定的参考价值。

一、前言

  生命不息,折腾不止。近期公司有数据迁移的计划,从Sqlserver迁移到mysql,虽说网上有很多数据迁移方案,但闲着也是闲着,就自己整一个,权当做是练练手了

二、解决思路

  整个迁移过程类似于ETL,将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端。读取并转换sqlserver库数据,将数据解析为csv文件,载入文件到mysql。流程如下:

  1. 抽取、转换
    此过程主要是处理源数据库与目标数据库表字段的映射关系,为了保证程序的通用性,通过配置文件映射字段关系,解析配置文件并生成数据库脚本
  2. 加载
    数据迁移的时候最好不要用INSERT语句插入批量插入,这样数据量稍稍大一点就很慢。sqlserver可通过SqlBulkCopy将DataTable对象快速插入到数据库,然后mysql并没有这东西,查阅资料后发现mysql可通过MySqlBulkLoader将csv文件快速导入到数据库。经测试迁移10K条数据MySqlBulkLoader可在1S内处理完,速度还是相当不错的

  

三、实现

  1. 配置文件
    db_caption.xml(数据库),主要用来存储表描述文件名,若待迁移的表不存在外键关系即迁移时不用考虑先后顺序,此配置文件可以不要。其中maxClients参数指的是异步迁移时,最大并发数。
    <?xml version="1.0" encoding="utf-8" ?>
    <root>
      <maxClients value="3"></maxClients>
      <tables>
        <table filename="t_drawtemplate.xml" caption="抽奖模板"></table>
        <table filename="t_drawprize.xml" caption="抽奖奖品"></table>
        <table filename="t_drawrecord.xml" caption="抽奖记录"></table>
        <table filename="t_drawwinner.xml" caption="中奖记录"></table>
      </tables>
    </root>

    t_table.xml(表),主要用来描述待迁移表信息及字段描述

    <?xml version="1.0" encoding="utf-8" ?>
    <root>
      <![CDATA[抽奖记录]]>
      <!--是否分页,默认不分页就好啦,false_不分页-->
      <isPaging value="true"></isPaging>
      <pageSize value="10000"></pageSize>
    
      <!--mssql数据库表主键-->
      <primaryKey value="DrawRecordId"></primaryKey>
      <!--mssql数据库表名-->
      <msTable value="DrawRecord"></msTable>
      <!--mysql数据库表名-->
      <myTable value="t_drawrecord"></myTable>
      <!--筛选条件,无特殊情况为空即可-->
      <filter value="1=1"></filter>
      <!--字段映射-->
      <fields>
        <field msName ="DrawRecordId" myName="id"></field>
        <field msName ="FK_MemberId" myName="user_id"></field>
        <field msName ="Remark" myName="remark"></field>
        <field msName ="DataStatus" myName="data_status"></field>
        <field msName ="DrawTime" myName="drawTime"></field>
        <!--需要调整字段示例-->
        <field msName ="CASE WHEN DrawWinnerId >0 THEN DrawWinnerId END" myName="drawwinner_id"></field>
      </fields>
      <!--迁移完成后,数据修复脚本,主要用来修正日期类型为0000-00-00 00:00:00问题-->
      <fixSql></fixSql>
    </root>
  2. 创建xml文件映射对象并重写ToString方法,将对象解析为sql
    db_caption.xml映射对象
     1 /// <summary>
     2 /// 数据库描述类(db_caption)
     3 /// </summary>
     4 internal class DBCaptionModel
     5 {
     6     public DBCaptionModel()
     7     {
     8         this.Tables = new List<TableModel>();
     9     }
    10 
    11     /// <summary>
    12     /// 最大连接数
    13     /// </summary>
    14     public int MaxClients { get; set; }
    15 
    16     /// <summary>
    17     /// 表集合
    18     /// </summary>
    19     public IList<TableModel> Tables { get; private set; }
    20 }
    21 
    22 internal class TableModel
    23 {
    24     /// <summary>
    25     /// 表xml文件名
    26     /// </summary>
    27     public string FileName { get; set; }
    28 
    29     /// <summary>
    30     /// 描述
    31     /// </summary>
    32     public string Caption { get; set; }
    33 
    34     /// <summary>
    35     /// 是否已同步
    36     /// </summary>
    37     public bool IsSync { get; set; }
    38 }
    t_table.xml映射对象
      1 /// <summary>
      2 /// 表描述类
      3 /// </summary>
      4 internal class TableCaptionModel
      5 {
      6     public TableCaptionModel()
      7     {
      8         this.Fields = new List<FieldModel>();
      9     }
     10 
     11     /// <summary>
     12     /// 是否分页
     13     /// </summary>
     14     public bool IsPaging { get; set; }
     15 
     16     /// <summary>
     17     /// 分页大小
     18     /// </summary>
     19     public int PageSize { get; set; }
     20 
     21     /// <summary>
     22     /// 源数据表表名
     23     /// </summary>
     24     public string SourceTableName { get; set; }
     25 
     26     /// <summary>
     27     /// 目标数据表表名
     28     /// </summary>
     29     public string TargetTableName { get; set; }
     30 
     31     /// <summary>
     32     /// 源数据表主键
     33     /// </summary>
     34     public string PrimaryKey { get; set; }
     35 
     36     /// <summary>
     37     /// 过滤条件
     38     /// </summary>
     39     public string Filter { get; set; }
     40 
     41     /// <summary>
     42     /// 字段集合
     43     /// </summary>
     44     public List<FieldModel> Fields { get; set; }
     45 
     46     /// <summary>
     47     /// 数据迁移完成后,数据修复脚本
     48     /// </summary>
     49     public string FixSql { get; set; }
     50 
     51     /// <summary>
     52     /// ToString
     53     /// </summary>
     54     /// <returns>sql</returns>
     55     public override string ToString()
     56     {
     57         string sql = GetBaseSql();
     58         string filter = GetFilterSql();
     59         if (!string.IsNullOrWhiteSpace(filter))
     60         {
     61             sql += " WHERE " + filter;
     62         }
     63 
     64         sql += " ORDER BY " + this.PrimaryKey;
     65         return sql;
     66     }
     67 
     68     /// <summary>
     69     /// 获取基础查询Sql
     70     /// </summary>
     71     /// <![CDATA[SELECT SourceField AS TargetField,...... FROM table]]>
     72     /// <returns></returns>
     73     private string GetBaseSql()
     74     {
     75         StringBuilder sb = new StringBuilder("SELECT");
     76 
     77         foreach (var item in this.Fields)
     78         {
     79             sb.AppendFormat(" {0},", item.ToString());
     80         }
     81 
     82         sb = sb.Remove(sb.Length - 1, 1);
     83 
     84         sb.Append(" FROM ");
     85         sb.Append(this.SourceTableName);
     86         return sb.ToString();
     87     }
     88 
     89     /// <summary>
     90     /// 获取sql查询条件
     91     /// </summary>
     92     /// <![CDATA[filter || PrimaryKey NOT IN (SELECT PrimaryKey FORM table WHERE filter)]]>
     93     /// <returns></returns>
     94     private string GetFilterSql()
     95     {
     96         if (!this.IsPaging)
     97         {
     98             return this.Filter;
     99         }
    100 
    101         StringBuilder sb = new StringBuilder();
    102         sb.AppendFormat("SELECT ROW_NUMBER() OVER(ORDER BY {0}) RowNo,{0} FROM {1}", this.PrimaryKey, this.SourceTableName);
    103 
    104         if (!string.IsNullOrWhiteSpace(this.Filter))
    105         {
    106             sb.Append(" WHERE " + this.Filter);
    107         }
    108 
    109         sb.Insert(0, string.Format("SELECT {0} FROM (", this.PrimaryKey));
    110         sb.AppendFormat(") T WHERE RowNo BETWEEN @StartIndex AND @EndIndex");
    111 
    112         return string.Format("{0} IN ({1})", this.PrimaryKey, sb.ToString());
    113     }
    114 }
    115 
    116 /// <summary>
    117 /// 字段类
    118 /// </summary>
    119 internal class FieldModel
    120 {
    121     /// <summary>
    122     /// 源字段名
    123     /// </summary>
    124     public string SourceFieldName { get; set; }
    125 
    126     /// <summary>
    127     /// 目标字段名
    128     /// </summary>
    129     public string TargetFieldName { get; set; }
    130 
    131     /// <summary>
    132     /// ToString
    133     /// </summary>
    134     /// <returns>\'SourceFieldName\' AS \'TargetFieldName\'" </returns>
    135     public override string ToString()
    136     {
    137         if (this.SourceFieldName.IndexOfAny(new char[] { \' \', \'(\' }) < 0)
    138         {
    139             //非表达式
    140             return string.Format("[{0}] AS \'{1}\'", SourceFieldName, TargetFieldName);
    141         }
    142         else
    143         {
    144             return string.Format("{0} AS \'{1}\'", SourceFieldName, TargetFieldName);
    145         }
    146     }
    147 }
  3. 解析XML文件
    XML解析可通过XmlSerializer直接反序列化为对象,此处只是为了温习XML解析方式,故采用此方法
     1 /// <summary>
     2 /// 载入数据库描述xml
     3 /// </summary>
     4 /// <returns></returns>
     5 private static DBCaptionModel LoadDBCaption()
     6 {
     7     DBCaptionModel model = new DBCaptionModel();
     8 
     9     XmlDocument doc = new XmlDocument();
    10     doc.Load(CONN_XML_PATH + "db_caption.xml");
    11 
    12     XmlNode root = doc.SelectSingleNode("root");
    13     //获取最大连接数
    14     model.MaxClients = root.SelectSingleNode("maxClients").GetAttribute<int>("value");
    15 
    16     //获取表描述
    17     XmlNodeList tables = root.SelectSingleNode("tables").SelectNodes("table");
    18     foreach (XmlNode node in tables)
    19     {
    20         model.Tables.Add(new TableModel
    21         {
    22             FileName = node.GetAttribute("filename"),
    23             Caption = node.GetAttribute("caption")
    24         });
    25     }
    26 
    27     return model;
    28 }
    29 
    30 /// <summary>
    31 /// 载入表描述xml
    32 /// </summary>
    33 /// <param name="fileName">表描叙xml文件名</param>
    34 /// <returns></returns>
    35 private static TableCaptionModel LoadTableCaption(string fileName)
    36 {
    37     XmlDocument doc = new XmlDocument();
    38     doc.Load(CONN_XML_PATH + fileName);
    39 
    40     TableCaptionModel model = new TableCaptionModel();
    41 
    42     XmlNode root = doc.SelectSingleNode("root");
    43     model.IsPaging = root.SelectSingleNode("isPaging").GetAttribute<bool>("value");
    44     if (model.IsPaging)
    45     {
    46         model.PageSize = root.SelectSingleNode("pageSize").GetAttribute<int>("value");
    47     }
    48     model.SourceTableName = root.SelectSingleNode("msTable").GetAttribute("value");
    49     model.TargetTableName = root.SelectSingleNode("myTable").GetAttribute("value");
    50     model.PrimaryKey = root.SelectSingleNode("primaryKey").GetAttribute("value");
    51     model.FixSql = root.SelectSingleNode("fixSql").GetAttribute("value");
    52 
    53     XmlNodeList fields = root.SelectSingleNode("fields").SelectNodes("field");
    54 
    55     foreach (XmlNode field in fields)
    56     {
    57         model.Fields.Add(new FieldModel
    58         {
    59             SourceFieldName = field.GetAttribute("msName"),
    60             TargetFieldName = field.GetAttribute("myName")
    61         });
    62     }
    63 
    64     return model;
    65 }

    Node.GetAttribute扩展方法,简化读取Node属性代码

     1 public static class XmlNodeExtension
     2 {
     3     /// <summary>
     4     /// 获取节点属性
     5     /// </summary>
     6     /// <param name="node">当前节点</param>
     7     /// <param name="attrName">属性名称</param>
     8     /// <returns></returns>
     9     public static string GetAttribute(this XmlNode node, string attrName)
    10     {
    11         if (node == null)
    12         {
    13             return null;
    14         }
    15         return ((XmlElement)node).GetAttribute(attrName);
    16     }
    17 
    18     /// <summary>
    19     /// 获取节点属性
    20     /// </summary>
    21     /// <param name="node">当前节点</param>
    22     /// <param name="attrName">属性名称</param>
    23     /// <returns></returns>
    24     public static T GetAttribute<T>(this XmlNode node, string attrName) where T : struct
    25     {
    26  

    以上是关于闲来无事,编写一个数据迁移小工具的主要内容,如果未能解决你的问题,请参考以下文章

    微信小程序代码片段

    博客转发小工具1

    博客转发小工具1

    Json解析工具类

    框架如何迁移? | X2Paddle 小工具教你玩转模型迁移

    C# 字符串处理小工具