MySQL AutoIncrement--PXC集群批量插入操作获取自增ID异常问题

Posted 笑东风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MySQL AutoIncrement--PXC集群批量插入操作获取自增ID异常问题相关的知识,希望对你有一定的参考价值。

问题描述

由于MySQL PXC集群的所有节点均可读写,因此当PXC集群中节点增加和减少时,PXC集群会自动调整集群各节点的自增ID步长,避免不同集群节点生成相同自增ID值产生冲突。

当PXC集群中读节点数量发生变化时,客户端执行BatchInsert方法可能返回错误的自增ID值。

问题原因

由于MySQL数据库仅提供LAST_INSERT_ID()函数来获取最后插入记录的自增ID,如果批量插入多条记录时,则返回批量插入记录的第一个自增ID值,如:

# 获取MySQL服务器的自增步长
mysql> SHOW VARIABLES LIKE \'%auto_increment%\';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| auto_increment_increment | 1     |
| auto_increment_offset    | 1     |
+--------------------------+-------+
2 rows in set (0.00 sec)
 
 
 
 
mysql> CREATE TABLE tb102(id INT AUTO_INCREMENT PRIMARY KEY ,c1 INT);
Query OK, 0 rows affected (0.00 sec)
 
mysql> INSERT INTO tb102(c1)VALUES(1);
Query OK, 1 row affected (0.00 sec)
 
mysql> INSERT INTO tb102(c1)VALUES(2);
Query OK, 1 row affected (0.01 sec)
 
# 获取上一条INSERT插入操作产生的自增ID。
mysql> SELECT LAST_INSERT_ID();
+------------------+
| LAST_INSERT_ID() |
+------------------+
|                2 |
+------------------+
1 row in set (0.00 sec)
 
 
mysql> INSERT INTO tb102(c1)VALUES(3),(4),(5);
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0
 
# 获取上一条INSERT插入操作产生的自增ID。
# 如果一次INSERT插入多条记录,则返回第一个自增ID。
mysql> SELECT LAST_INSERT_ID();
+------------------+
| LAST_INSERT_ID() |
+------------------+
|                3 |
+------------------+
1 row in set (0.00 sec)
 
mysql> SELECT * FROM tb102;
+----+------+
| id | c1   |
+----+------+
|  1 |    1 |
|  2 |    2 |
|  3 |    3 |
|  4 |    4 |
|  5 |    5 |
+----+------+
5 rows in set (0.00 sec)

当应用程序使用BatchInsert方式批量插入多条记录且需要返回多条记录对应的自增ID时,客户端会:

执行LAST_INSERT_ID() 获取批量插入的第一个自增ID
按照批量插入操作的影响行数循环,在第一自增ID值上依次增加自增步长

当应用程序采用批量插入多条记录时,会通过executeBatch-->executeBatchInternal-->getBatchedGeneratedKeys-->getGeneratedKeysInternal来获取到批量插入记录的自增ID:

protected ResultSetInternalMethods getGeneratedKeysInternal(long numKeys) throws SQLException 
    synchronized (checkClosed().getConnectionMutex()) 
        String encoding = this.session.getServerSession().getCharsetSettings().getMetadataEncoding();
        int collationIndex = this.session.getServerSession().getCharsetSettings().getMetadataCollationIndex();
        Field[] fields = new Field[1];
        fields[0] = new Field("", "GENERATED_KEY", collationIndex, encoding, MysqlType.BIGINT_UNSIGNED, 20);
 
        ArrayList<Row> rowSet = new ArrayList<>();
 
        long beginAt = getLastInsertID();
 
        if (this.results != null) 
            String serverInfo = this.results.getServerInfo();
 
            //
            // Only parse server info messages for \'REPLACE\' queries
            //
            if ((numKeys > 0) && (this.results.getFirstCharOfQuery() == \'R\') && (serverInfo != null) && (serverInfo.length() > 0)) 
                numKeys = getRecordCountFromInfo(serverInfo);
            
 
            if ((beginAt != 0 /* BIGINT UNSIGNED can wrap the protocol representation */) && (numKeys > 0)) 
                for (int i = 0; i < numKeys; i++) 
                    byte[][] row = new byte[1][];
                    if (beginAt > 0) 
                        row[0] = StringUtils.getBytes(Long.toString(beginAt));
                     else 
                        byte[] asBytes = new byte[8];
                        asBytes[7] = (byte) (beginAt & 0xff);
                        asBytes[6] = (byte) (beginAt >>> 8);
                        asBytes[5] = (byte) (beginAt >>> 16);
                        asBytes[4] = (byte) (beginAt >>> 24);
                        asBytes[3] = (byte) (beginAt >>> 32);
                        asBytes[2] = (byte) (beginAt >>> 40);
                        asBytes[1] = (byte) (beginAt >>> 48);
                        asBytes[0] = (byte) (beginAt >>> 56);
 
                        BigInteger val = new BigInteger(1, asBytes);
 
                        row[0] = val.toString().getBytes();
                    
                    rowSet.add(new ByteArrayRow(row, getExceptionInterceptor()));
                    beginAt += this.connection.getAutoIncrementIncrement();
                
            
        
 
        ResultSetImpl gkRs = this.resultSetFactory.createFromResultsetRows(ResultSet.CONCUR_READ_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE,
                new ResultsetRowsStatic(rowSet, new DefaultColumnDefinition(fields)));
 
        return gkRs;
    

客户端使用LAST_INSERT_ID()获取最后插入的自增列的值

/**
 * getLastInsertID returns the value of the auto_incremented key after an
 * executeQuery() or excute() call.
 *
 * <p>
 * This gets around the un-threadsafe behavior of "select LAST_INSERT_ID()" which is tied to the Connection that created this Statement, and therefore could
 * have had many INSERTS performed before one gets a chance to call "select LAST_INSERT_ID()".
 * </p>
 *
 * @return the last update ID.
 */
public long getLastInsertID() 
    synchronized (checkClosed().getConnectionMutex()) 
        return this.lastInsertId;
    

客户端在连接初始化时获取MySQL服务器端的参数变量auto_increment_increment并缓存到本地重复使用:

@Override
public int getAutoIncrementIncrement() 
    return this.autoIncrementIncrement;

 
 
 /**
 * Sets varying properties that depend on server information. Called once we
 * have connected to the server.
 *
 * @throws SQLException
 *             if a database access error occurs
 */
private void initializePropsFromServer() throws SQLException 
    this.autoIncrementIncrement = this.session.getServerSession().getServerVariable("auto_increment_increment", 1);

当MySQL服务器端自增步长随PXC节点变化而变化时,MySQL客户端仍使用缓存的自增步长(auto_increment_increment),则会导致MySQL客户端计算得到的自增值与MySQL服务器端实际产生的自增值不同。

优化建议

由于PXC节点数量变化无法控制(如硬件故障导致节点强制下线),因此无法保证PXC节点长期保持相同自增步长。

即使使用发布订阅机制在PXC集群节点发生变化时通知客户端重连并刷新本地缓存的自增步长,在PXC集群节点变化到MySQL客户端重连期间仍会存在该问题。

因此建议:

  • 如果业务不依赖BatchInsert方法返回的批量自增列值时,可继续使用BatchInsert方法。
  • 如果业务严重依赖BatchInsert方法返回的批量自增列值时,将批量插入操作改为多次单条插入操作,在每次单条记录插入后获取自增列值,并将多次插入插入封装到一个事务中。该方式与批量插入操作相比会增加多次请求的网络延时,可通过多并发方式来解决。

C++遍历mysql结果集,一万条数据花多长时间?

我用C++程序从mysql里取出一个包含10000条数据的ResultSet,遍历这个结果集(就是下面这个while循环):
while(Row row = res.fetch_row())


循环中不执行任何操作,居然就花了3秒多,这个时间正常不?
我感觉应该很快才对唉~~

参考技术A res.fetch_row()这个函数的开销够大的追问

有没有开销小点的……

追答

把Row row放在括号前面,每一次循环都要建立变量也耗时。

追问

试了一下,少了200ms左右……谢谢!~~

参考技术B 还可以吧,什么机器配置?追问

这10000条数据大概是1.5 MB
机器是:
Windows 7 X64
i5 3.10GHz, 8GB RAM

参考技术C 正常,准备遍历了要很长时间 参考技术D fetch_row()遍历要很长时间

以上是关于MySQL AutoIncrement--PXC集群批量插入操作获取自增ID异常问题的主要内容,如果未能解决你的问题,请参考以下文章

C++遍历mysql结果集,一万条数据花多长时间?

MySQL字符集与校对集详解

如何遍历mysql结果集

MySQL垂直显示查询结果集

MySQL 字符集和校对集

为啥 MySQL 给出错误“不允许从函数返回结果集”?