Java 中 MySQL 插入语句的性能:批处理模式准备语句与具有多个值的单个插入

Posted

技术标签:

【中文标题】Java 中 MySQL 插入语句的性能:批处理模式准备语句与具有多个值的单个插入【英文标题】:Performance of MySQL Insert statements in Java: Batch mode prepared statements vs single insert with multiple values 【发布时间】:2012-07-08 12:29:58 【问题描述】:

我正在设计一个mysql 数据库,该数据库需要在各种 InnoDB 表中每秒处理大约 600 行插入。我当前的实现使用非批处理准备语句。但是,写入MySQL 数据库会遇到瓶颈,并且我的队列大小会随着时间的推移而增加。

实现是用 Java 编写的,我不知道手头的版本。它使用MySQL 的Java connector。我需要考虑明天切换到JDBC。我假设这是两个不同的连接器包。

我已经阅读了关于这个问题的以下主题:

Optimizing MySQL inserts to handle a data stream MyISAM versus InnoDB Inserting Binary data into MySQL (without PreparedStatement's)

来自 mysql 网站:

http://dev.mysql.com/doc/refman/5.0/en/insert-speed.html

我的问题是:

对于在批处理模式下使用带有准备好的语句的 INSERT 与使用带有多个 VALUE 的单个 INSERT 语句的性能差异,是否有人有任何建议或经验。

MySQL Java 连接器与JDBC 之间的性能差异是什么。我应该使用其中一个吗?

这些表用于存档目的,大约 90% 的写入到大约 10% 的读取(可能甚至更少)。我正在使用 InnoDB。这是不是 MyISAM 的正确选择?

提前感谢您的帮助。

【问题讨论】:

在使用批量插入时,您将在单个事务中执行此操作。在其他情况下,您将有每行插入的事务。 也许 dba.stackexchange 会更适合这个问题。 +1 感谢您所做的研究和努力,尽管这是您的第一篇文章。 【参考方案1】:

您在任何受影响的表上是否有任何触发器?如果没有,每秒 600 次插入看起来并不多。

来自 JDBC 的批量插入功能将在同一事务中多次发出同一语句,而多值 SQL 将在单个语句中压缩所有值。在多值语句的情况下,您将不得不动态构造插入 SQL,这可能是更多代码、更多内存、SQL 注入保护机制等方面的开销。首先尝试常规批处理功能,对于您的工作负载,它应该不是问题。

如果您没有批量接收数据,请考虑在插入之前对其进行批处理。 我们在单独的线程上使用队列来实现生产者-消费者的安排。在这种情况下,我们会阻止插入,直到经过特定时间或队列的大小超过阈值。

如果您希望通知生产者插入成功,则需要更多的管道。

有时仅仅阻塞线程可能更直接和实用。

if(System.currentTimeMills()-lastInsertTime>TIME_THRESHOLD || queue.size()>SIZE_THRESHOLD) 
    lastInsertTime=System.currentTimeMills();
    // Insert logic
     else 
    // Do nothing OR sleep for some time OR retry after some time. 
    

【讨论】:

感谢您的建议。我今天做了一些研究,并建立了基本的生产者-消费者关系。我的数据处理器在一个线程中工作,将信息添加到属于 mysql 插入线程的队列中。它似乎工作得很好。我使用 innodb 是因为我要尝试保留一些重要的外键关系。看来他们在研究生计划中可能并不是真正需要的,所以我明天可能会切换到 myISAM 看看情况如何。【参考方案2】:

JDBC 只是一个 Java SE 数据库访问标准,提供标准接口,因此您不必真正绑定到特定的 JDBC 实现。 MySQL Java 连接器 (Connector/J) 是仅用于 MySQL 数据库的 JDBC 接口的实现。出于经验,我参与了一个使用 MySQL 使用大量数据的项目,对于可以生成的数据,我们更喜欢 MyISAM:它可以实现更高的性能丢失事务,但一般来说,MyISAM 更快,但 InnoDB 更可靠。

大约一年前,我也想知道 INSERT 语句的性能,并在我的代码架中找到了以下旧测试代码(抱歉,它有点复杂,有点超出你的问题范围)。下面的代码包含了 4 种插入测试数据的示例:

单身 INSERTs; 批处理 INSERTs; 手动批量 INSERT(永远不要使用它 - 这很危险); 最后准备大容量INSERT)。

它使用TestNG 作为运行器,并使用一些自定义代码遗留,例如:

runWithConnection() 方法 - 确保在执行回调后关闭连接或放回连接池(但下面的代码使用不可靠的语句关闭策略 - 即使没有 try/finally减少代码); IUnsafeIn<T, E extends Throwable> - 接受单个参数但可能引发 E 类型异常的方法的自定义回调接口,例如:void handle(T argument) throws E;
package test;

import test.IUnsafeIn;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.lang.System.currentTimeMillis;

import core.SqlBaseTest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

public final class InsertVsBatchInsertTest extends SqlBaseTest 

    private static final int ITERATION_COUNT = 3000;

    private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS ttt1 (c1 INTEGER, c2 FLOAT, c3 VARCHAR(5)) ENGINE = InnoDB";
    private static final String DROP_TABLE_QUERY = "DROP TABLE ttt1";
    private static final String CLEAR_TABLE_QUERY = "DELETE FROM ttt1";

    private static void withinTimer(String name, Runnable runnable) 
        final long start = currentTimeMillis();
        runnable.run();
        logStdOutF("%20s: %d ms", name, currentTimeMillis() - start);
    

    @BeforeSuite
    public void createTable() 
        runWithConnection(new IUnsafeIn<Connection, SQLException>() 
            @Override
            public void handle(Connection connection) throws SQLException 
                final PreparedStatement statement = connection.prepareStatement(CREATE_TABLE_QUERY);
                statement.execute();
                statement.close();
            
        );
    

    @AfterSuite
    public void dropTable() 
        runWithConnection(new IUnsafeIn<Connection, SQLException>() 
            @Override
            public void handle(Connection connection) throws SQLException 
                final PreparedStatement statement = connection.prepareStatement(DROP_TABLE_QUERY);
                statement.execute();
                statement.close();
            
        );
    

    @BeforeTest
    public void clearTestTable() 
        runWithConnection(new IUnsafeIn<Connection, SQLException>() 
            @Override
            public void handle(Connection connection) throws SQLException 
                final PreparedStatement statement = connection.prepareStatement(CLEAR_TABLE_QUERY);
                statement.execute();
                statement.close();
            
        );
    

    @Test
    public void run1SingleInserts() 
        withinTimer("Single inserts", new Runnable() 
            @Override
            public void run() 
                runWithConnection(new IUnsafeIn<Connection, SQLException>() 
                    @Override
                    public void handle(Connection connection) throws SQLException 
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) 
                            final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
                            statement.setInt(1, i);
                            statement.setFloat(2, i);
                            statement.setString(3, valueOf(i));
                            statement.execute();
                            statement.close();
                        
                    
                );
            
        );
    

    @Test
    public void run2BatchInsert() 
        withinTimer("Batch insert", new Runnable() 
            @Override
            public void run() 
                runWithConnection(new IUnsafeIn<Connection, SQLException>() 
                    @Override
                    public void handle(Connection connection) throws SQLException 
                        final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) 
                            statement.setInt(1, i);
                            statement.setFloat(2, i);
                            statement.setString(3, valueOf(i));
                            statement.addBatch();
                        
                        statement.executeBatch();
                        statement.close();
                    
                );
            
        );
    

    @Test
    public void run3DirtyBulkInsert() 
        withinTimer("Dirty bulk insert", new Runnable() 
            @Override
            public void run() 
                runWithConnection(new IUnsafeIn<Connection, SQLException>() 
                    @Override
                    public void handle(Connection connection) throws SQLException 
                        final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) 
                            if ( i != 0 ) 
                                builder.append(",");
                            
                            builder.append(format("(%s, %s, '%s')", i, i, i));
                        
                        final String query = builder.toString();
                        final PreparedStatement statement = connection.prepareStatement(query);
                        statement.execute();
                        statement.close();
                    
                );
            
        );
    

    @Test
    public void run4SafeBulkInsert() 
        withinTimer("Safe bulk insert", new Runnable() 
            @Override
            public void run() 
                runWithConnection(new IUnsafeIn<Connection, SQLException>() 
                    private String getInsertPlaceholders(int placeholderCount) 
                        final StringBuilder builder = new StringBuilder("(");
                        for ( int i = 0; i < placeholderCount; i++ ) 
                            if ( i != 0 ) 
                                builder.append(",");
                            
                            builder.append("?");
                        
                        return builder.append(")").toString();
                    

                    @SuppressWarnings("AssignmentToForLoopParameter")
                    @Override
                    public void handle(Connection connection) throws SQLException 
                        final int columnCount = 3;
                        final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
                        final String placeholders = getInsertPlaceholders(columnCount);
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) 
                            if ( i != 0 ) 
                                builder.append(",");
                            
                            builder.append(placeholders);
                        
                        final int maxParameterIndex = ITERATION_COUNT * columnCount;
                        final String query = builder.toString();
                        final PreparedStatement statement = connection.prepareStatement(query);
                        int valueIndex = 0;
                        for ( int parameterIndex = 1; parameterIndex <= maxParameterIndex; valueIndex++ ) 
                            statement.setObject(parameterIndex++, valueIndex);
                            statement.setObject(parameterIndex++, valueIndex);
                            statement.setObject(parameterIndex++, valueIndex);
                        
                        statement.execute();
                        statement.close();
                    
                );
            
        );
    


看看用@Test 注解注解的方法:它们实际上执行INSERT 语句。另请查看CREATE_TABLE_QUERY 常量:在源代码中,它使用 InnoDB 在我安装了 MySQL 5.5 的机器上产生以下结果(MySQL Connector/J 5.1.12):

InnoDB
Single inserts: 74148 ms
Batch insert: 84370 ms
Dirty bulk insert: 178 ms
Safe bulk insert: 118 ms

如果您将 CREATE_TABLE_QUERY InnoDB 更改为 MyISAM,您会看到性能显着提升:

MyISAM
Single inserts: 604 ms
Batch insert: 447 ms
Dirty bulk insert: 63 ms
Safe bulk insert: 26 ms

希望这会有所帮助。

更新:

对于第 4 种方式,您必须正确自定义 mysql.ini[mysqld] 部分)中的 max_allowed_packet,使其足够大以支持非常大的数据包。

【讨论】:

感谢基准测试,这是我可以要求的最直接的答案。我今天实施了成批准备好的插入,它就像一个魅力! 知道为什么在 InnoDB 上批量插入比单次插入慢吗? @stracktracer 嗯嗯,确实。很难说,因为它可能只与特定的测试用例有关。感谢您的通知。【参考方案3】:

我知道这个线程已经很老了,但我只是想我会提到,如果你在使用 mysql 时将“rewriteBatchedStatements=true”添加到 jdbc url,它可以在使用批处理语句时带来巨大的性能提升。

【讨论】:

这是我第一次遇到“rewriteBatchedStatements”设置。对于大批量语句,这将速度提高了几个数量级,这正是我所寻找的。请参阅***.com/questions/26307760 了解更多信息。【参考方案4】:

经过我自己的一些测试,Jordan L 给出了最好的提示。我认为执行时间 Lyubomyr 给出的 InnoDB 非脏批量插入是错误的,因为他很可能没有在 JDBC 连接字符串中使用“rewriteBatchedStatements=true”。没有它,批次就一文不值。在我自己的测试中,使用准备好的语句的非脏批量插入甚至比使用准备好的语句的脏方法更快。

【讨论】:

以上是关于Java 中 MySQL 插入语句的性能:批处理模式准备语句与具有多个值的单个插入的主要内容,如果未能解决你的问题,请参考以下文章

java对mysql进行批处理插入数据,如何返回错误的没插入数据

向mysql中批量插入数据的性能分析

MYSQL批量插入数据库实现语句性能分析

MYSQL批量插入数据库实现语句性能分析

MYSQL批量插入数据库实现语句性能分析

java中使用mysql批量插入大文本(二进制)数据时出错