c#中往mysql里批量插入上万条数据,有比较高效的方法吗

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c#中往mysql里批量插入上万条数据,有比较高效的方法吗相关的知识,希望对你有一定的参考价值。

首先, 插入上万条数据,对于数据库来说并不是“很大”的工作量,一般配置的笔记本电脑都可以在1分钟内完成。 所以最简单、最灵活的办法还是写SQL语句。

如果不希望DB编译器每次执行都编译SQL的话,可以使用存储过程,直接调用,性能上会好很多。也比较简单。
(几万条数据怎么地也得要时间去处理,所以不可能特别快的。)

如果由于各种原因,导致这个插入还是很慢, 而且你的mysql又是5.0以上版本的话,可以使用BulkCopy来进行批量操作。
BulkCopy的原理就是Client直接把一个数组(DataTable)传给DB,然后传入表名,所有的编译、操作都由DB自己完成,效率很高。
引用MySql.Data.dll , 调用MysqlBulkCopy函数即可。

这个函数在处理海量数据插入的时候效率尤为明显, 小量数据反而没什么优势,而且由于传入的DataTable格式必须和表的字段一模一样(空的列也要传进去),导致C#要写很多代码来构造这个数组,所以要你自己权衡用还是不用。
我在自己的电脑上批量插入一亿条数据,Insert写法大概需要1小时,BulkCopy大概只需要5分钟。
参考技术A

这几百个数据我组合成上述一条语句,20多个账户多个采集点我一共生成上述语句300--800多条,每条的数据项300-500个,所以每次插入的数据共计10000--50000条。

我把每条insert语句都保存在一个SQLStringList之中,又 在网上找点资料,采用了事务处理方式,本来我的事务方式是所有sql语句放在一个事务里,但有热心朋友告知“每当执行1000条DBCommand就提交(Commit)事务,然后再次开启事务,这样比较好。把过多的命令放在一个事务中,一旦超过物理内存分配限制,你的程序会变得很慢很慢。”

所以我后来修改了一下,每500条语句重启一次事务。c#代码如下:

/// <summary>
        /// 执行多条SQL语句,实现数据库事务。
        /// </summary>mysql数据库
        /// <param name="SQLStringList">多条SQL语句</param>
        public static void ExecuteSqlTran(List<string> SQLStringList)
        
            using (MySqlConnection conn = new MySqlConnection(MySqlHelper.ConnStr))
            
                conn.Open();
                MySqlCommand cmd = new MySqlCommand();
                cmd.Connection = conn;
                MySqlTransaction tx = conn.BeginTransaction();
                cmd.Transaction = tx;
                try
                
                    for (int n = 0; n < SQLStringList.Count; n++)
                    
                        string strsql = SQLStringList[n].ToString();
                        if (strsql.Trim().Length > 1)
                        
                            cmd.CommandText = strsql;
                            cmd.ExecuteNonQuery();
                        
                       //后来加上的
                       if (n > 0 && (n % 500 == 0 || n == SQLStringList.Count - 1))
                        
                            tx.Commit();
                            tx = conn.BeginTransaction();
                        
                    
                    //tx.Commit();//原来一次性提交
                
                catch (System.Data.SqlClient.SqlException E)
                
                    tx.Rollback();
                    throw new Exception(E.Message);
                
            
        

参考技术B 用参数法(避免Database每次插入都要解析SQL)。

多条数据批量插入优化方案

业务背景描述:

? 主数据同步:调用主数据查询接口,返回json字符串,包含上万条数据信息。将所有数据信息提取出来并插入指定数据表中。

? tips:

1.要求数据同步接口为定时方法(比如每晚12点调用一次主数据接口查询主数据),进行数据的同步更新
2.主数据基本不会发生变更,每天可能会有少量更新和新增信息

此业务比较简单,然后之前的代码是这样实现

调用接口后获得主数据信息--> json 字符串,然后转为json对象,获取所有主数据信息
然后将主数据信息转为json数组
到这里json数组中每一个元素就是要同步的数据,大概有上万条
然后遍历json数组,取出每个json数组的id,根据id去数据库中查询是否已经存在此条数据
(id为唯一主键)
然后进行判断,如果查到此数据,说明已经存在,则执行修改操作
如果没有查到,说明此数据之前不存在则执行新增操作

问题:

数据不算多,但是进行测试的时候,使用上述方法,此接口执行了5分钟!!!
原因分析:
	有多少条数据就遍历多少次,上万条数据不算多,已经执行了5分钟,如果数据大批量则会执行更长的时间。并且每次遍历的逻辑比较冗余,先是去数据库中查是否存在,存在则执行修改
	
	可想而知:
		第一次同步的时候,数据表中完全是空的,所以全都是插入操作
		以后的每次同步,基本都是修改操作,因为之前提到过,每天主数据可能只有少量修改	  和新增
	所以基本后面每次调用此接口都是修改,效率可想而知

虽然此接口是凌晨调用,前人做的时候可能觉得效率快慢无所谓

但是此数据用到的频率过高,比如其他接口开发的时候需要最新的主数据,就需要写个测试接口去更新一下主数据,但是更新了5分钟 实在太烦,所以很有必要优化一下的

正好业务需要,在其他项目中也要写一遍主数据同步的逻辑,所以直接过了优化

优化的思路过程,如下

一、命中数据优化

每次都遍历都去查询一次数据,然后再做修改操作。

先从数据表中查询出所有的id,因为id是唯一id,所以将查出的所有id用Set保存起来,

同时也利用set集合查询快的优点,然后同样遍历数组

只不过遍历的时候不是根据id去数据库中查询了,而是去set集合中查询

//伪代码----------------------------------------
//查出所有的id
HashSet<Integer> idSet = userMapper.findAllId();

for(Json json:JsonArray){
        
        if(idSet.contains(json.get("id"))){
            //执行修改

            //从set集合中删除此元素
            idSet.remove(json.get("id"));
        }
        //执行新增
        
    }

这样只是将数据库命中数据做了优化,但还是要每次遍历都执行修改

经测试没有显著提升

二、批量插入优化1

想过做批量修改的优化,但是可能会得不偿失,效率大可能性也不会提升

所以从根本上解决问题,每次都将数据表中的数据进行删除,然后全部执行新增即可

这样简单直接,因为本身修改就很慢

并且有事务的支持,即使删除后 新增数据失败,也会进行回滚

可以保证数据的有效性

所以接下来的优化都是新增的优化

新增优化无非就是

1.使用批量新增, insert into 表名 values(值1,值2..),(),(),....减少连接数据库的次数
2.插入时候保证主键的有序性,可以提高插入效率(因为主数据中的id本身就是无序的,再排序感觉没必要)
3.使用原生jdbc进行插入,因为框架本来封装的逻辑会影响效率

结合各种原因,选择使用第一种优化

伪代码;

//定义集合用于批量新增
    List<User> list = new LinkedList<>();

    for(Json json:JsonArray){
        
        //遍历将数据封装进实体中
        list.add(new User(json));
        
        while (list.size()==500){
            //当集合中满500个元素的时候,执行批量新增
            userMapper.add(list);
            
            //新增完成将集合清空,用于下一次批量新增
            list.clear();
        }

    }
    
    //遍历完成之后,如何处理不满500条的数据?

tips:

1.在User中已经做了赋值操作
2.因为集合要进行频繁的插入操作,所以选择插入数据较快的LinkedList
3.这样每500个元素批量插入一次,最后肯定有不满500条的数据没有执行插入,如何处理?

三、批量插入优化2

关于处理最后一次不满500条的数据,

想过几个方案,比如根据总条数和每次插入的条数计算出总共处理的次数

然后最后一次插入的时候做一些处理

也想过在虚拟机退出的时候,做一些处理操作,见下面代码

public static void main(String[] args) {

        //模拟数据
        LinkedList<Integer> list = new LinkedList<>();
        for (int i = 0; i < 10; i++) {
            list.add(i);
        }

        //定义集合用于批量新增
        List<Integer> addList = new LinkedList<>();

        for (Integer i : list) {
            addList.add(i);
            while (addList.size()==3){
                System.out.println("addList = " + addList);
                addList.clear();
            }
        }

        //虚拟机退出的时候,处理集合中残余数据
        Runtime.getRuntime().addShutdownHook(new Thread(() ->
                System.out.println("addList = " + addList)
        ));
    }

打印结果:

addList = [0, 1, 2]
addList = [3, 4, 5]
addList = [6, 7, 8]
addList = [9]

这样确实能处理到所有的数据,我可真是异想天开,虚拟机退出这个不适用啊

其实很简单,马上就想到了方案

每次都是满500条才进行批量新增,然后批量新增成功才执行清空集合操作
那么是不是不满500条就不会新增,也就不会清空集合
并且用于批量新增的集合是定义在循环外面的
所以循环结束后集合中肯定有残余数据

见下面代码:

public static void main(String[] args) {

        //模拟数据
        LinkedList<Integer> list = new LinkedList<>();
        for (int i = 0; i < 10; i++) {
            list.add(i);
        }

        //定义集合用于批量新增
        List<Integer> addList = new LinkedList<>();

        for (Integer i : list) {
            addList.add(i);
            while (addList.size() == 3) {
                System.out.println("addList = " + addList);
                addList.clear();
            }
        }

        System.out.println("addList = " + addList);

    }

效果是一样的,想复杂了

所以处理起来就简单多了

	//定义集合用于批量新增
    List<User> list = new LinkedList<>();

    for(Json json:JsonArray){
        
        //遍历将数据封装进实体中
        list.add(new User(json));
        
        while (list.size()==500){
            //当集合中满500个元素的时候,执行批量新增
            userMapper.add(list);
            
            //新增完成将集合清空,用于下一次批量新增
            list.clear();
        }
    }
    
    //遍历完成之后,处理不满500条的数据
	userMapper.add(list);

因为本身就是比较简单的逻辑,稍微动一点心思,几行代码能大大提高效率

最后经测试,之前旧的方案要执行5分钟

此优化之后的方案只要执行 20 - 30秒,快的时候可以到15 - 18秒

效果显而易见!!!

四、批量插入优化3

为什么还有优化3呢,因为是这样的,下面呢有一个需求 要求要将一个集合中的所有元素插入到数据表中,该集合的元素共有一万多个

用上面的方法也可以实现,并且也是几行代码的事情,也比较简单

但是!!!

本着不安分,闲着蛋疼的心思,想着还能不能继续优化?

上述的方案实现起来确实方便,但是同样要去遍历一万多次,

如果数据量大了 总感觉这样不妥?怎么办呢?

先看下面代码

package com.liqiliang.ssm.service.impl;

@Service
@Transactional
public class UserServiceImpl implements UserService {

    @Autowired
    private UserMapper userMapper;

    @Override
    public boolean add() {

        List<User02> list = new ArrayList<>();

        //模拟10000条数据
        for (int i = 1; i <= 10000; i++) {
            list.add(new User02(i, "a"));
        }

        System.out.println("list.size() = " + list.size());


        int count = 3000;                   //一个线程处理3000条数据
        int listSize = list.size();         //数据集合大小
        int runSize = listSize%count==0?listSize/count:listSize/count+1;  //开启的线程数(处理的次数)
        List<User02> newlist = null;       //存放每个线程的执行数据

        ExecutorService executor = Executors.newFixedThreadPool(runSize);      //创建一个线程池,数量和开启线程的数量一样

        //循环创建线程
        for (int i = 0; i < runSize; i++) {
            //计算每个线程执行的数据
            if ((i + 1) == runSize) {
                int startIndex = (i * count);
                int endIndex = list.size();
                newlist = list.subList(startIndex, endIndex);
            } else {
                int startIndex = (i * count);
                int endIndex = (i + 1) * count;
                newlist = list.subList(startIndex, endIndex);
            }

            //调用方法处理数据
            method(newlist,executor);

        }

        //执行完关闭线程池
        executor.shutdown();


        return true;
    }


    private void method(List<User02> list,ExecutorService executor) {

        Thread thread = new Thread(() -> {

            userMapper.add(list);

            //打印出每次处理的数据
            System.out.println("list = " + list);
        });

        //执行线程
        executor.execute(thread);

    }
}

就不多啰嗦了,直接说明结果吧

上述方法 用postman调用  耗时1307ms

技术图片

tips:

这是将缓存清除之后 测得的数据,如果不清除缓存,执行时间更短,有5ms和21ms,肯定不能作为参考数据

不进行任何优化的遍历方法,一条一条插入

@Override
    public boolean add() {

        List<User02> list = new ArrayList<>();

        //模拟10000条数据
        for (int i = 1; i <= 10000; i++) {
            list.add(new User02(i, "a"));
        }

        System.out.println("list.size() = " + list.size());


        for (User02 user02 : list) {
            userMapper.addOne(user02);
        }
                return true;
    }

耗时:29.50s

技术图片

效果显而易见,简单阐述下优化思路吧

首先有一个一万个元素的集合需要将其中所有数据插入到数据库表中
不希望进行全部遍历
然后也是分次插入去实现,每次肯定是批量插入,这种插入最快
但是mysql一次最多可以批量插入多少条数据,
或者一次批量插入多少条数据效率是最高的这个没做考量
(一次批量插入3000条这个是可以的)

知道了每次是具体怎么实现的时候就是分几次的问题了,这个简单
10000条,每次处理3000条的话,就要处理4次
计算公式:总条数/每次处理的次数,看能不能除尽就能判断

知道了处理几次之后,然后去遍历处理的次数
每遍历一次都是做一次数据的处理
遍历第一次,处理第0个到第3000个元素
遍历第二次,处理第3000个到第6000个元素
遍历第三次,处理第6000个到第9000个元素
遍历第四次,处理第9000个到第10000个元素

------然后就有了上述的代码-----

前面几次简单,只不过最后一次遍历的时候,需要做一下处理

然后这样遍历就只需要遍历处理次数,然后每次遍历的时候,对总集合数据进行subList对集合切割即可

然后每次处理,可以用多线程,处理一次开启一个线程处理

----线程池创建线程数就是处理的次数----

每一次处理的时候,将处理的数据和线程池对象传递给处理方法
处理方法 开启一个线程-->处理数据-->线程池执行线程

最后遍历结束之后关闭线程池即可

结论:

多线程的优化方法代码相比来说要复杂了点,但是个人感觉效率高一点

优化2的代码更少,也更简单,可以实现,也更好理解一点

但是至于两种方案到底哪种效率更高一些,这个没有做对比,因为感觉没那么重要,重要的是解决这个问题的思想,个人感觉解决问题的思路才是最重要的!!!

只是模拟了一万条数据做了小demo进行简单测试,优化2已经到生产中使用

并且这都是数据量少的情况,数据量大的话也应该会有更好的处理方法

以上为个人总结的3种优化方法,本人不才,感谢浏览,仅供参考

以上是关于c#中往mysql里批量插入上万条数据,有比较高效的方法吗的主要内容,如果未能解决你的问题,请参考以下文章

mysql大批量插入数据的正确做法

mysql批量插入数据

在 MySQL 中插入 5 万条记录

C#中往SQLServer插入数据时遇到BUG

批量向MySQL导入1000万条数据的优化

如何快速安全的插入千万条数据?