数据库实现批量快速插入大量数据的六种方案

Posted 哥们要飞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据库实现批量快速插入大量数据的六种方案相关的知识,希望对你有一定的参考价值。

一、(mysql)通过函数/存储过程

1、链接

https://www.jb51.net/article/207999.htm

https://blog.csdn.net/FloraCHY/article/details/117792903

2、代码

-- 进入数据库
use test;
-- 显示所有表
show tables;
-- 创建majors表
create table majors(id int, major varchar(255));
-- 定义结束符$
delimiter "$";
-- 创建存储过程,定义存储方法
create procedure batchInsert(in args int)
begin
declare i int default 1;
-- 开启事务(重要!不开的话,100w数据需要论天算)
start transaction;
while i <= args do
insert into majors(id,major) value(i,concat("软件工程-",i));
set i = i+ 1;
end while;
commit;
end
$
 
-- 调用函数,生成数据
-- 先生成10w条试试,同时输入$, 回车执行
call batchInsert(100000);
$

3、性能

10000条数据用了0.9s

100000条,5s执行完

100w条数据用了58s

二、通过jdbc的批量插入语句(add/executeBatch)

1、链接

http://t.zoukankan.com/lizm166-p-7890168.html

2、代码

//获取要设置的Arp基准的List后,插入Arp基准表中    
    public boolean insertArpStandardList(List<ArpTable> list) 
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        //MySql的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。
        //优化插入性能,用JDBC的addBatch方法,但是注意在连接字符串加上面写的参数。
        //例如: String connectionUrl="jdbc:mysql://192.168.1.100:3306/test?rewriteBatchedStatements=true" ;
        String sql = "insert into arp_standard(guid, devicebrand, devicename, deviceip, ipaddress, " +
                     "macaddress, createtime) values(?,?,?,?,?,?,?)";
        try
            conn = DBConnection.getConnection();
            ps = conn.prepareStatement(sql);
            //优化插入第一步设置手动提交  
            conn.setAutoCommit(false); 
            int len = list.size();
            for(int i=0; i<len; i++) 
                ps.setString(1, list.get(i).getGuid());
                ps.setString(2, list.get(i).getDeviceBrand());
                ps.setString(3, list.get(i).getDeviceName());
                ps.setString(4, list.get(i).getDeviceIp());
                ps.setString(5, list.get(i).getIpAddress());
                ps.setString(6, list.get(i).getMacAddress());
                ps.setString(7, list.get(i).getCreateTime());
                //if(ps.executeUpdate() != 1) r = false;    优化后,不用传统的插入方法了。
                //优化插入第二步插入代码打包,等一定量后再一起插入。
                ps.addBatch(); 
                //if(ps.executeUpdate() != 1)result = false;
                //每200次提交一次 
                if((i!=0 && i%200==0) || i==len-1)//可以设置不同的大小;如50,100,200,500,1000等等  
                    ps.executeBatch();  
                    //优化插入第三步提交,批量插入数据库中。
                    conn.commit();  
                    ps.clearBatch();//提交后,Batch清空。
                
            
         catch (Exception e) 
            System.out.println("MibTaskPack->getArpInfoList() error:" + e.getMessage());
            return false;   //出错才报false
         finally 
            DBConnection.closeConection(conn, ps, rs);
        
        return true;
    

三、通过多线程执行jdbc过程

1、链接

http://t.zoukankan.com/fangts-p-6813515.html

2、代码

package tenThreadInsert;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
 
public class MyThread extends Thread
                public void run() 
                     String url = "jdbc:mysql://127.0.0.1/teacher"; 
                     String name = "com.mysql.jdbc.Driver"; 
                     String user = "root"; 
                     String password = "123456"; 
                    Connection conn = null; 
                    try 
                        Class.forName(name);
                        conn = DriverManager.getConnection(url, user, password);//获取连接 
                        conn.setAutoCommit(false);//关闭自动提交,不然conn.commit()运行到这句会报错
                     catch (ClassNotFoundException e1) 
                        e1.printStackTrace();
                     catch (SQLException e) 
                        e.printStackTrace();
                    
                // 开始时间
                Long begin = new Date().getTime();
                // sql前缀
                String prefix = "INSERT INTO test_teacher (t_name,t_password,sex,description,pic_url,school_name,regist_date,remark) VALUES ";
                try 
                    // 保存sql后缀
                    StringBuffer suffix = new StringBuffer();
                    // 设置事务为非自动提交
                    conn.setAutoCommit(false);
                    // 比起st,pst会更好些
                    PreparedStatement  pst = (PreparedStatement) conn.prepareStatement("");//准备执行语句
                    // 外层循环,总提交事务次数
                    for (int i = 1; i <= 10; i++) 
                        suffix = new StringBuffer();
                        // 第j次提交步长
                        for (int j = 1; j <= 100000; j++) 
                            // 构建SQL后缀
                            suffix.append("('" +i*j+"','123456'"+ ",'男'"+",'教师'"+",'www.bbb.com'"+",'Java大学'"+",'"+"2016-08-16 14:43:26"+"','备注'" +"),");
                        
                        // 构建完整SQL
                        String sql = prefix + suffix.substring(0, suffix.length() - 1);
                        // 添加执行SQL
                        pst.addBatch(sql);
                        // 执行操作
                        pst.executeBatch();
                        // 提交事务
                        conn.commit();
                        // 清空上一次添加的数据
                        suffix = new StringBuffer();
                    
                    // 头等连接
                    pst.close();
                    conn.close();
                 catch (SQLException e) 
                    e.printStackTrace();
                
                // 结束时间
                Long end = new Date().getTime();
                // 耗时
                System.out.println("100万条数据插入花费时间 : " + (end - begin) / 1000 + " s"+"  插入完成");
      

 测试代码

package tenThreadInsert;
 
public class Test 
 
    public static void main(String[] args) 
        for (int i = 1; i <=10; i++) 
              new MyThread().start();
            
    
 

四、一次性插入多条记录

1、原理

MySQL:

INSERT INTO Persons (LastName, Address) VALUES ('Wilson', 'Champs-Elysees'),('Gates', 'Champs-Elysees')

Oracle:

insert into 表名 (字段1)

select '1' from dual

union all

select '2' from dual

2、代码

(1)调用

public static Boolean insertManyByOne(int num) 
        String sql = GenSqlUtil.genInsManySql(num);
        // System.out.println(sql);
        jdbcUtils.insertMany(sql);
        System.out.println("共插入" + num + "条数据");
        return true;
    


public static String genInsManySql(int num) 
        String sql = "INSERT INTO TEST.\\"ABANK\\"\\n ";
        for (int i = 0; i < num; i++) 
            sql = sql.concat("select '1', 'CH', '9999', 'Zürcher Kantonalbank', " +
                    "'ZKBKCHZZ80A', ' ', TO_DATE('2009-11-10 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), " +
                    "TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), " +
                    "TO_DATE('2017-07-12 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), " +
                    "'ADMIN', TO_DATE('1599-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'), " +
                    "'ADMIN', TO_TIMESTAMP('2021-04-23 08:54:05.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), " +
                    "TO_TIMESTAMP('"+ dateFormat.format(calendar.getTime()) +
                    "', 'SYYYY-MM-DD HH24:MI:SS:FF3'), " +
                    "HEXTORAW('"+ RandNumGenUtil.genDefLenStr(15) +"') from dual");
            if (i != num -1) 
                sql = sql.concat("\\n union all \\n");
            
        
        return sql;
    

(2)jdbcutils

package com.boulderaitech.utils;

import java.sql.*;
import java.util.Arrays;

public class JDBCUtil 
    private String user;
    private String pass;
    private String url;

    private Connection conn = null;//连接对象
    private ResultSet rs = null;//结果集对象
    private Statement sm = null;

    /**
     * 构造函数获得数据库用户名和密码
     *
     * @param user
     * @param pass
     */
    public JDBCUtil(String user, String pass) 
        this.user = user;
        this.pass = pass;
        this.url = "jdbc:oracle:thin:@//172.16.5.162:1521/helowin";
    

    /**
     * 连接数据库
     *
     * @return
     */
    public Connection createConnection() 
        String sDBDriver = "oracle.jdbc.driver.OracleDriver";
        try 
            Class.forName(sDBDriver).newInstance();
            conn = DriverManager.getConnection(url, user, pass);
         catch (Exception e) 
            System.out.println("数据库连接失败");
            e.printStackTrace();
        
        return conn;
    

    /**
     * 关闭数据库
     *
     * @param conn
     */
    public void closeConnection(Connection conn) 
        try 
            if (conn != null) 
                conn.close();
            
         catch (Exception e) 
            System.out.println("数据库关闭失败");
            e.printStackTrace();
        
    

    /**
     * 插入数据
     *
     * @param insert 插入语句
     * @return
     */
    public int insert(String insert) 
        conn = createConnection();
        //String insert = "insert into t_department values('D004','金融部')";
        int re = 0;
        try 
            conn.setAutoCommit(false);//事物开始

            sm = conn.createStatement();
            re = sm.executeUpdate(insert);
            if (re < 0)                //插入失败
                conn.rollback();      //回滚
                sm.close();
                closeConnection(conn);
                return re;
            
            conn.commit();            //插入正常
            sm.close();
            closeConnection(conn);
            return re;
         catch (Exception e) 
            e.printStackTrace();
        
        closeConnection(conn);
        return 0;
    

    /**
     * 批量插入数据
     */
    public int insertBatch(String[] sql) 
        conn = createConnection();
        //String insert = "insert into t_department values('D004','金融部')";
        int re = 0;
        try 
            conn.setAutoCommit(false);//事务开始
            sm = conn.createStatement();
            Arrays.stream(sql).forEach(x->
                try 
                    sm.executeUpdate(x);
                 catch (SQLException e) 
                    e.printStackTrace();
                
            );
            conn.commit();            //插入正常
            sm.close();
            closeConnection(conn);
            return re;
         catch (Exception e) 
            e.printStackTrace();
        
        closeConnection(conn);
        return 0;
    

    /**
     * 查询语句
     * 返回结果集
     *
     * @param select
     * @return
     */
    public ResultSet selectSql(String select) 
        conn = createConnection();
        try 
            sm = conn.createStatement();
            rs = sm.executeQuery(select);
            return rs;
         catch (Exception e) 
            e.printStackTrace();
        
        return null;
    

    /**
     * 根据结果集输出
     *
     * @param rs
     */
    public void printRs(ResultSet rs) 
        int columnsCount = 0;
        boolean f = false;
        try 
            if (!rs.next()) 
                return;
            
            ResultSetMetaData rsmd = rs.getMetaData();
            columnsCount = rsmd.getColumnCount();//数据集的列数
            for (int i = 0; i < columnsCount; i++) 
                System.out.print(rsmd.getColumnLabel(i + 1) + "/n"); //输出列名
            
            System.out.println();

            while (!f) 
                for (int i = 1; i <= columnsCount; i++) 
                    //System.out.print(rs.getString(i)+"/t");
                    //逻辑处理
                    String name = rs.getString("NAME");

                    System.out.print(rs.getString("NAME") + "/n");
                
                System.out.println();
                if (!rs.next()) 
                    f = true;
                
            
            rs.close();
         catch (Exception e) 
            e.printStackTrace();
        
        closeConnection(conn);
    

    /**
     * 插入数据
     *
     * @param update 更新语句
     * @return
     */
    public int update(String update) 
        conn = createConnection();
        //String insert = "insert into t_department values('D004','金融部')";
        int re = 0;
        try 
            conn.setAutoCommit(false);//事物开始

            sm = conn.createStatement();
            re = sm.executeUpdate(update);
            if (re < 0)                //插入失败
                conn.rollback();      //回滚
                sm.close();
                closeConnection(conn);
                return re;
            
            conn.commit();            //插入正常
            sm.close();
            closeConnection(conn);
            return re;
         catch (Exception e) 
            e.printStackTrace();
        
        closeConnection(conn);
        return 0;
    

    public int insertMany(String sql) 
        conn = createConnection();
        int re = 0;
        try 
            conn.setAutoCommit(false);//事物开始

            sm = conn.createStatement();
            re = sm.executeUpdate(sql);
            if (re < 0)                //插入失败
                conn.rollback();      //回滚
                sm.close();
                closeConnection(conn);
                return re;
            
            conn.commit();            //插入正常
            sm.close();
            closeConnection(conn);
            return re;
         catch (Exception e) 
            e.printStackTrace();
        
        closeConnection(conn);
        return 0;
    

五、通过定时器实现定时执行

public static Boolean insertBatchFixTime(int numOfInsert, int timePerEpoch) 
        Timer timer = new Timer();
        timer.schedule(new TimerTask() 
            @Override
            public void run() 
                insertManyByOne(numOfInsert);
            
        , 0L, timePerEpoch * 1000L);
        System.out.println("当前线程:" + Thread.currentThread().getName() + " 当前时间" + LocalDateTime.now());
        return true;
    

六、通过循环实现批量插入

public static Boolean insertBatchFixCircle(int numOfEachInsert, int numOfEpoch) 
        LocalDateTime start = LocalDateTime.now();
        for (int i = 0; i < numOfEpoch; i++) 
            insertManyByOne(numOfEachInsert);
        
        System.out.println("共插入" + numOfEachInsert * numOfEpoch+"条数据");
        LocalDateTime end = LocalDateTime.now();
        System.out.println("共耗时" + Duration.between(start, end).toMillis() + "ms");
        return true;
    

redis五种数据结构与六种底层实现

1.redis的五种对外暴露的数据结构

1.string 字符串
2.hash 哈希结构
3.list 列表
4. set 集合
5. sorted set 有序集合

2.redis数据结构的六种底层实现

  1. sds (simple dynamic string) 简单动态字符串
  2. dict 字典
  3. intset 整数集合
  4. skiplist 跳表
  5. ziplist 压缩表
  6. quicklist 快速列表

3.redisObject

redisObject本质上是对外暴露的数据结构与底层实现之间的一个桥梁,相关的代码如下,redis版本3.2。

#define OBJ_STRING 0
#define OBJ_LIST 1
#define OBJ_SET 2
#define OBJ_ZSET 3
#define OBJ_HASH 4

/* Objects encoding. Some kind of objects like Strings and Hashes can be
 * internally represented in multiple ways. The 'encoding' field of the object
 * is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0     /* Raw representation */
#define OBJ_ENCODING_INT 1     /* Encoded as integer */
#define OBJ_ENCODING_HT 2      /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */

#define LRU_BITS 24
typedef struct redisObject 
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
    int refcount;
    void *ptr;
 robj;

最开始的5种类型,就是redis对外暴露的5种数据结构。而下面的10种类型,则为redis底层存储的6种结构,当然其中有一部分是以前版本现在已经不使用了。

从源码中可以看出,一个redisobj包含有5个字段

  1. type 即前面所说对外暴露的5种数据类型
  2. encoding 对象内部编码方式,即我们前面提到的六种底层实现(有的是历史版本,目前已经废弃)
  3. lru淘汰算法
  4. refcout 引用计数
  5. ptr 数据指针,指向真正的存储数据。

4.SDS

SDS是redis中使用非常广泛的一种底层数据结构,着重看一下SDS的相关内容。

/* Note: sdshdr5 is never used, we just access the flags byte directly.
 * However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 
    unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
    char buf[];
;
struct __attribute__ ((__packed__)) sdshdr8 
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
;
struct __attribute__ ((__packed__)) sdshdr16 
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
;
struct __attribute__ ((__packed__)) sdshdr32 
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
;
struct __attribute__ ((__packed__)) sdshdr64 
    //记录buf数组中已使用的字节数量
    uint64_t len; 
    //分配的buf数组长度,不包括头和空字符结尾 
    uint64_t alloc;
    // 标志位, 最低3位表示类型,另外5个位没有使用
    unsigned char flags; 
    char buf[];
;

SDS有5中header类型,之所以要定义不同的类型,是为了让不同的字符串使用不同长度的header,达到节省内存的目的。

而在SDS的结构体中,有以下4个字段

  1. len 记录buf数组中已使用的字符数量
  2. alloc 分配的buf数组长度,不包括头与结尾的空串
  3. flags 最低的三位表示header类型,对应下面的五种类型
#define SDS_TYPE_5  0
#define SDS_TYPE_8  1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4
  1. buf[] 为字符数组,存放真正的字符串数据

以"redis"这个字符串为例,我们看看实际是如何存储的

SDS同样遵循C语言中字符串以’\\0’结尾的观里,并且这个空串不计算在len的长度中,另外还为该串额外分配了一个字节的空间,包括添加到末尾等操作,全由SDS中的函数自己完成,不需要调用者进行额外的操作。这样做的好处,就可以直接使用C语言中一部分字符串函数库中的函数。

那么使用SDS,相比C语言中的字符串,好处在哪里呢?
结合Redis设计与实现,总结有如下几点

1.可以在常数时间内获取字符串的长度

因为c语言中的字符串不记录字符串长度,要想获得长度必须遍历字符串,时间复杂度为O(N)。而SDS中有记录字符串长度的字段len,可以在O(1)时间内就获取字符串长度。

2.杜绝缓冲区溢出

C字符串不记录自身的长度,每次增长或缩短一个字符串,都要对底层的字符数组进行一次内存重分配操作。如果是拼接append操作之前没有通过内存重分配来扩展底层数据的空间大小,就会产生缓存区溢出;如果是截断trim操作之后没有通过内存重分配来释放不再使用的空间,就会产生内存泄漏。
而在redis中通过alloc属性记录总的分配字节数量。通过未使用空间,SDS实现了空间预分配和惰性空间释放两种优化的空间分配策略,解决了字符串拼接和截取的空间问题。

3.存放二进制数据

C 字符串中的字符必须符合某种编码(比如 ASCII), 并且除了字符串的末尾之外, 字符串里面不能包含空字符, 否则最先被程序读入的空字符将被误认为是字符串结尾。这样使得C语言中的字符串只能保存纯文本而无法保存其中像视频图片这种二进制数据。

而在SDS中,是以处理二进制的方式来处理存放在buf数组中的数据,因为是二进制,所以不会对里面的数据类型有任何限制,也可以存放视频图像声音这类非文本数据。

以上是关于数据库实现批量快速插入大量数据的六种方案的主要内容,如果未能解决你的问题,请参考以下文章

MyBatis动态批量插入更新Mysql数据库的通用实现方案

redis五种数据结构与六种底层实现

Hibernate查询的六种方式

解决数据架构难点数据分布的六种策略

详解前端异步编程的六种方案

JavaScript的六种数据类型