做ETL的时候用到的数据同步更新代码

Posted 小泉的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了做ETL的时候用到的数据同步更新代码相关的知识,希望对你有一定的参考价值。

这里是用的从一个库同步到另一个库,代码如下

 

        private void IncrementalSyncUpdate(string fromConn, string toConn, Dictionary<string, string> sqlList)
        {
            var sw = new Stopwatch();

            using (var conn = new SqlConnection(fromConn))
            {
                if (conn.State != ConnectionState.Open) conn.Open();

                using (var cmd = new SqlCommand())
                {
                    try
                    {
                        foreach (var item in sqlList)
                        {
                            var sql = item.Value;

                            var tableName = item.Key;

                            sw.Restart();

                            cmd.CommandText = sql;
                            cmd.CommandType = CommandType.Text;
                            cmd.Connection = conn;

                            DataTable dt = new DataTable();

                            SqlDataAdapter sda = new SqlDataAdapter(cmd);

                            sda.Fill(dt);

                            if (dt.Rows.Count == 0) continue;//如果没有新增数据,跳过

                            foreach (DataRow row in dt.Rows)
                            {
                                row.SetModified();
                            }

                            int updateCount = 0;

                            using (var TmpConn = new SqlConnection(toConn))
                            {
                                if (TmpConn.State != ConnectionState.Open) TmpConn.Open();

                                using (var tmpCmd = new SqlCommand())
                                {
                                    tmpCmd.CommandText = "select * from Meb_" + tableName;
                                    tmpCmd.CommandType = CommandType.Text;
                                    tmpCmd.Connection = TmpConn;
                                    SqlDataAdapter tmpSda = new SqlDataAdapter(tmpCmd);

                                    SqlCommandBuilder scb = new SqlCommandBuilder(tmpSda);
                                    tmpSda.UpdateCommand = scb.GetUpdateCommand();
                                    updateCount = tmpSda.Update(dt);

                                }
                            }

                            sw.Stop();

                            Console.WriteLine("增量同步数据更新完成,表名:{0},数据{1}条,耗时{2}秒", tableName, updateCount, sw.Elapsed.TotalSeconds);
                        }
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine("增量同步数据同步更新失败," + e.ToString());
                    }
                }
            }
        }

这个sqllist是一个   Key:表名  Value:查询SQL  ,做增量同步查询的,主要根据时间戳来获取时间。

用SqlAdapter的Update方法来做更新,这里有个问题,在更新的DataTable里面的数据要对DataRow的RowState进行一个设置,如果是未设置,那么Update过后,数据库是没有变化的。

以上是关于做ETL的时候用到的数据同步更新代码的主要内容,如果未能解决你的问题,请参考以下文章

需求:公司需要做数据迁移同步,以下是几种常见的ETL工具选型对比

ETL全量单表同步简述

canal +RocketMQ实现MySQL与ElasticSearch数据同步

SQL Server -;; 基于表TIMESTAMP类型字段+NOLOCK脏读的ETL增量同步方案发现的数据遗漏问题

「开源」数据同步ETL工具,支持多数据源间的增、删、改数据同步

swift常用代码片段