Node.js mysql 事务

Posted

技术标签:

【中文标题】Node.js mysql 事务【英文标题】:Node.js mysql transaction 【发布时间】:2013-03-23 12:12:30 【问题描述】:

谁能提供我如何在 Node.js 中实现 mysql 事务的示例。我正在尝试使用 node-mysql 驱动程序和 node-mysql-queue。

据我所知,使用 node-mysql-queue 大大降低了 Node.js 的异步特性,因为新查询必须等到现有查询完成。为了解决这个问题,有没有人尝试将 node-mysql-queue 与 node-mysql 的连接池功能结合起来。即为每个新的 http 请求启动一个新的 mysql 连接,并在各个连接上启动事务队列?

【问题讨论】:

【参考方案1】:

更新

请参阅下面的编辑了解 async/await 语法


我花了一些时间写了一个node mysql给出的事务示例的通用版本,所以我想在这里分享一下。我使用 Bluebird 作为我的承诺库,并用它来“承诺”连接对象,这大大简化了异步逻辑。

const Promise = ('bluebird');
const mysql = ('mysql');

/**
 * Run multiple queries on the database using a transaction. A list of SQL queries
 * should be provided, along with a list of values to inject into the queries.
 * @param  array queries     An array of mysql queries. These can contain `?`s
 *                              which will be replaced with values in `queryValues`.
 * @param  array queryValues An array of arrays that is the same length as `queries`.
 *                              Each array in `queryValues` should contain values to
 *                              replace the `?`s in the corresponding query in `queries`.
 *                              If a query has no `?`s, an empty array should be provided.
 * @return Promise           A Promise that is fulfilled with an array of the
 *                              results of the passed in queries. The results in the
 *                              returned array are at respective positions to the
 *                              provided queries.
 */
function transaction(queries, queryValues) 
    if (queries.length !== queryValues.length) 
        return Promise.reject(
            'Number of provided queries did not match the number of provided query values arrays'
        )
    

    const connection = mysql.createConnection(databaseConfigs);
    Promise.promisifyAll(connection);
    return connection.connectAsync()
    .then(connection.beginTransactionAsync())
    .then(() => 
        const queryPromises = [];

        queries.forEach((query, index) => 
            queryPromises.push(connection.queryAsync(query, queryValues[index]));
        );
        return Promise.all(queryPromises);
    )
    .then(results => 
        return connection.commitAsync()
        .then(connection.endAsync())
        .then(() => 
            return results;
        );
    )
    .catch(err => 
        return connection.rollbackAsync()
        .then(connection.endAsync())
        .then(() => 
            return Promise.reject(err);
        );
    );

如果您想按照问题中的建议使用池化,您可以轻松地将createConnection 行与myPool.getConnection(...) 切换,并将connection.end 行与connection.release() 切换。


编辑

我使用mysql2 库(与mysql 相同的api,但支持promise)和新的async/await 运算符对代码进行了另一次迭代。就是这样

const mysql = require('mysql2/promise')

/** See documentation from original answer */
async function transaction(queries, queryValues) 
    if (queries.length !== queryValues.length) 
        return Promise.reject(
            'Number of provided queries did not match the number of provided query values arrays'
        )
    
    const connection = await mysql.createConnection(databaseConfigs)
    try 
        await connection.beginTransaction()
        const queryPromises = []

        queries.forEach((query, index) => 
            queryPromises.push(connection.query(query, queryValues[index]))
        )
        const results = await Promise.all(queryPromises)
        await connection.commit()
        await connection.end()
        return results
     catch (err) 
        await connection.rollback()
        await connection.end()
        return Promise.reject(err)
    

【讨论】:

【参考方案2】:

以下交易示例是一个月前添加到文档中的:

https://github.com/felixge/node-mysql#transactions

connection.beginTransaction(function(err) 
  if (err)  throw err; 
  connection.query('INSERT INTO posts SET title=?', title, function(err, result) 
    if (err)  
      connection.rollback(function() 
        throw err;
      );
    

    var log = 'Post ' + result.insertId + ' added';

    connection.query('INSERT INTO log SET data=?', log, function(err, result) 
      if (err)  
        connection.rollback(function() 
          throw err;
        );
        
      connection.commit(function(err) 
        if (err)  
          connection.rollback(function() 
            throw err;
          );
        
        console.log('success!');
      );
    );
  );
);

【讨论】:

这并不能真正解决异步问题。在事务完成之前,连接应该专门用于事务。 这不是回滚插入的第一个查询结果。【参考方案3】:

我正在使用以下方法。我的模型中有一个添加函数,我正在执行数据库操作。

  add : function (data, callback) 

    //Begin transaction
    connection.beginTransaction(function(err) 
        if (err) 
            throw err;
        

        var user_query = "INSERT INTO `calldata`.`users` (`username`, `password`, `enabled`, `accountNonExpired`, `accountNonLocked`, `credentialsNonExpired`) VALUES ('" + data.mobile + "', '" + sha1(data.password) + "', '1', '1', '1', '1')";
        connection.query(user_query, function(err, results) 
            if (err) 
                return connection.rollback(function() 
                    throw err;
                );
            

            var accnt_dtls_query = "INSERT INTO `calldata`.`accnt_dtls` (`req_mob_nmbr`, `usr_nme`, `dvce_id`, `mngr_id`, `cmpny_id`, `actve_flg`, `crtd_on`, `usr`) VALUES (" + data.mobile + ", '" + data.name + "', '', " + data.managerId + ", " + data.companyId + ", 1, now(), '" + data.mobile+ "')";

            connection.query(accnt_dtls_query, function(err, results) 
                if (err) 
                    return connection.rollback(function() 
                        throw err;
                    );
                
                var user_role_query = "INSERT INTO `calldata`.`user_roles` (`username`, `ROLE`) VALUES ('" + data.mobile + "', '" + data.role + "')";

                connection.query(user_role_query, function(err, result) 
                    if (err) 
                        return connection.rollback(function() 
                            throw err;
                        );
                    

                    //add an entry to manager table
                    var mngr_dtls_query = "INSERT INTO `calldata`.`mngr_dtls` (`mngr_nm`, `cmpny_id`, `crtd_on`, `usr_nm`, `eml_id`) VALUES ('" + data.name + "'," + data.companyId + " , now(), '" + data.mobile + "', '" + data.mobile + "')";
                    connection.query(mngr_dtls_query, function(err, result) 
                        if (err) 
                            return connection.rollback(function () 
                                throw err;
                            );
                        
                        console.log('Changed ' + result.changedRows + ' results');
                        connection.commit(function (err) 
                            console.log('Commiting transaction.....');
                            if (err) 
                                return connection.rollback(function () 
                                    throw err;
                                );
                            

                            console.log('Transaction Complete.');
                            connection.end();
                            callback(null, result);
                        );
                    );
                );
            );
        );
    );
    //transaction ends here

并从控制器调用:

 agentAccountModel.add(data, function(err, results) 
                if(err)
                
                    res.status(500);
                    res.json(
                        "status": 500,
                        "message": err
                    );
                

                res.status(200);
                res.json(
                    "status": 200,
                    "message": "Saved successfully"

                );
            );

【讨论】:

回调地狱? @Splines ...是的,这是非常古老的代码,您可以使用 async 和 await 或 Promise.all() !! 谢谢,是的,刚刚发现 mysql2 提供承诺 here 和 this 是与交易结合使用的绝佳代码示例。【参考方案4】:

我想出了一个使用递归函数的解决方案。

var sql = 'INSERT INTO logs SET data = ?';

// array of rows to insert
var rows = [[/*first row*/], [/*additional row*/]];

connection.beginTransaction(function (err) 

    if (err)  
        throw err; 
    

    var insertEachRow = function () 

        var row = rows.shift();

        if (! row) 
            // Done, now commit
            return noMoreRows();
        

        connection.query(sql, row, function (err, result) 
            if (err)  
                connection.rollback(function () 
                    throw err;
                );
              

            insertEachRow();
        );
    ;

    var noMoreRows = function () 
        connection.commit(function (err) 
            if (err)  
                connection.rollback(function () 
                    throw err;
                );
            
            console.log('success!');
        );
    ;

    insertEachRow();
);

【讨论】:

【参考方案5】:

我发现了一个有用的链接,它使用 node js mysql pooling with transaction。数据库连接池总是有用的。大家可以看看这个链接

https://github.com/mysqljs/mysql

【讨论】:

【参考方案6】:

我为此特定目的创建了一个包装器 ORM 类型的东西,希望它有助于SQl-connecton - pool ORM type helper methods

【讨论】:

【参考方案7】:

你也可以和mysql一起使用,只使用这个函数,不带任何参数 /call back 。 确保你有异步功能并使用这些来代替

await con.rollback();
await con.beginTransaction();
await con.commit();

生活设定.....

【讨论】:

以上是关于Node.js mysql 事务的主要内容,如果未能解决你的问题,请参考以下文章

在 Node.js 中处理回滚的 MySQL 事务

使用 mysql 和 node.js 的可序列化事务

node.js mysql 池 beginTransaction & 连接

Async.js解决Node.js操作MySQL的回调大坑

Java进阶:mysql的事务隔离级别面试题

node.js + postgres 数据库事务管理