java大批量数据导入(MySQL)
Posted 禁忌夜色153
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java大批量数据导入(MySQL)相关的知识,希望对你有一定的参考价值。
© 版权声明:本文为博主原创文章,转载请注明出处
最近同事碰到大批量数据导入问题,因此也关注了一下。大批量数据导入主要存在两点问题:内存溢出和导入速率慢。
内存溢出:将文件中的数据全部取出放在集合中,当数据过多时就出现Java内存溢出,此时可通过调大JVM的最大可用内存(Xmx)解决,
但终究不是王道。
mysql支持一条SQL语句插入多条记录的操作,并且效率比单条插入快的不是一点点;但是MySQL一次可接受的数据包大小
也是有限制的,当一次插入过多时也可能造成数据包内存溢出,此时可通过调大MySQL的max_allowed_packet 解决,
但也不是王道。
导入速率慢:单条插入就不用考虑了,因此考虑一条SQL语句插入多条记录,
根据上述所说还应控制好一条插入的数据大小不能超过max_allowed_packet 的配置。
下面比较了用PreparedStatement和直接拼接SQL两种批量插入的方式的速率(一次插入1w条)
package org.javaio.CSV; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.text.SimpleDateFormat; import java.util.Date; import com.mysql.jdbc.Connection; /** * 导入大批量CSV文件 * */ public class Test { /** * jdbc所属,暂不使用 */ private final static String url = "jdbc:mysql://localhost:3306/demo_test?useSSL=true&characterEncoding=utf8"; private final static String name = "root"; private final static String pwd = "20121221"; private static Connection conn; private static PreparedStatement ps; /** * 解析csv文件并插入到数据库中,暂不使用(jdbc) * * @param args * * @throws Exception */ public static void main(String[] args) throws Exception { Test test = new Test(); // psBatch 时间统计 - 开始 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String startTime = sdf.format(new Date()); System.out.println("psBatch 开始时间为:" + startTime); System.out.println("psBatch 开始执行..."); // 使用PreparedStatement批量插入 int idx = test.psBatch(); // 统计时间 - 结束 System.out.println("psBatch 执行完成,共插入" + idx + "条数据"); String endTime = sdf.format(new Date()); System.out.println("psBatch 结束时间为:" + endTime); System.out.println(); // 时间统计 - 开始 startTime = sdf.format(new Date()); System.out.println("sqlBatch 开始时间为:" + startTime); System.out.println("sqlBatch 开始执行..."); // 使用SQL语句批量插入 idx = test.sqlBatch(); // 统计时间 - 结束 System.out.println("sqlBatch 执行完成,共插入" + idx + "条数据"); endTime = sdf.format(new Date()); System.out.println("sqlBatch 结束时间为:" + endTime); } /** * 使用PreparedStatement批量插入 * * @return * * @throws Exception */ private int psBatch() throws Exception { int idx = 0;// 行数 try { // 读取CSV文件 FileInputStream fis = new FileInputStream("C:/Users/chen/Desktop/data/ceshi .csv"); InputStreamReader isr = new InputStreamReader(fis, "UTF-8"); BufferedReader br = new BufferedReader(isr); String line;// 行数据 String[] column = new String[4];// 列数据 // 获取数据库连接 conn = getConnection(); // 设置不自动提交 conn.setAutoCommit(false); // SQL String sql = "insert into test (name, `desc`, column1, column2, column3, column4) " + "values (?, ?, ?, ?, ?, ?)"; ps = conn.prepareStatement(sql); while ((line = br.readLine()) != null) {// 循环读取每一行 idx++;// 计数 column = line.split(","); ps.setString(1, column[0]); if (column.length >= 2 && column[1] != null) { ps.setString(2, column[1]); } else { ps.setString(2, ""); } if (column.length >= 3 && column[2] != null) { ps.setString(3, column[2]); } else { ps.setString(3, ""); } if (column.length >= 4 && column[3] != null) { ps.setString(4, column[3]); } else { ps.setString(4, ""); } ps.setString(5, "type"); ps.setString(6, "1"); ps.addBatch(); if (idx % 10000 == 0) { ps.executeBatch(); conn.commit(); ps.clearBatch(); } } if (idx % 10000 != 0) { ps.executeBatch(); conn.commit(); ps.clearBatch(); } } catch (Exception e) { System.out.println("第" + idx + "前一万条数据插入出错..."); } finally { try { if (ps != null) { // 关闭连接 ps.close(); } if (conn != null) { conn.close(); } } catch (Exception e2) { e2.printStackTrace(); } } return idx; } /** * 使用sql语句批量插入 * * @return * * @throws Exception */ private int sqlBatch() { int idx = 0;// 行数 try { // 读取CSV文件 FileInputStream fis = new FileInputStream("C:/Users/chen/Desktop/data/ceshi .csv"); InputStreamReader isr = new InputStreamReader(fis, "UTF-8"); BufferedReader br = new BufferedReader(isr); String line;// 行数据 String[] column = new String[4];// 列数据 // 获取数据库连接 conn = getConnection(); // SQL StringBuffer sql = new StringBuffer("insert into test (name, `desc`, column1, column2, column3, column4) " + "values "); while ((line = br.readLine()) != null) {// 循环读取每一行 idx++;// 计数 column = line.split(","); sql.append("(\'" + column[0] + "\', \'"); if (column.length >= 2 && column[1] != null) { sql.append(column[1] + "\', \'"); } else { sql.append("\', \'"); } if (column.length >= 3 && column[2] != null) { sql.append(column[2] + "\', \'"); } else { sql.append("\', \'"); } if (column.length >= 4 && column[3] != null) { sql.append(column[3] + "\', \'"); } else { sql.append("\', \'"); } sql.append("type\', \'1\'),"); if (idx % 10000 == 0) { String executeSql = sql.toString().substring(0, sql.toString().lastIndexOf(",")); ps = conn.prepareStatement(executeSql); ps.executeUpdate(); sql = new StringBuffer("insert into test (name, `desc`, column1, column2, column3, column4) " + "values "); } } if (idx % 10000 != 0) { String executeSql = sql.toString().substring(0, sql.toString().lastIndexOf(",")); ps = conn.prepareStatement(executeSql); ps.executeUpdate(); } } catch (Exception e) { System.out.println("第" + idx + "前一万条数据插入出错..."); } finally { try { if (ps != null) { // 关闭连接 ps.close(); } if (conn != null) { conn.close(); } } catch (Exception e2) { e2.printStackTrace(); } } return idx; } /** * 获取数据库连接 * * @param sql * SQL语句 */ private Connection getConnection() throws Exception { Class.forName("com.mysql.jdbc.Driver"); conn = (Connection) DriverManager.getConnection(url, name, pwd); return conn; } }
速率比较:为了排除其他影响,两次次都是在空表的情况下进行导入的
用SQL拼接批量插入用时大概3-4分钟
用PreparedStatement批量插入用时大概10分钟
以上是关于java大批量数据导入(MySQL)的主要内容,如果未能解决你的问题,请参考以下文章