事务与批处理查询以避免重复的 MySQL 插入

Posted

技术标签:

【中文标题】事务与批处理查询以避免重复的 MySQL 插入【英文标题】:Transaction vs Batch Query to Avoid Duplicate MySQL Inserts 【发布时间】:2021-10-07 02:35:45 【问题描述】:

我有一个 php 脚本 (deleteAndReInsert.php),它会删除 name = 'Bob' 所在的所有行,然后使用 name = 'Bob' 插入 1000 个新行。这可以正常工作,最初为空的表最终有 1000 行,如预期的那样。

$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)

    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);

问题是如果我运行deleteAndReInsert.php 两次(几乎在同一时间),最终表包含超过 1000 行。

似乎正在发生的事情是第一次运行的 DELETE 查询完成,然后许多(但不是全部)1000 INSERTS 被调用。

然后第二个 DELETE 查询开始并在前 1000 个 INSERTS 完成之前完成(比如 1000 个 INSERTS 中的 350 个完成)。现在第二个 1000 INSERTS 运行,我们最终得到 1650 行而不是 1000 总行,因为在第二个 DELETE 被调用后仍有 1000 - 350 = 650 INSERTS 剩余。

防止此问题发生的正确方法是什么?我应该将所有内容包装在事务中,还是应该进行 1 次批量插入调用而不是 1000 次单独插入?显然我可以实现这两种解决方案,但我很好奇哪一种可以保证防止这个问题。

【问题讨论】:

一个事务可能对你没有帮助,因为你有多个插入,并且没有真正的方法来判断哪个是对还是错。相反,我会研究应用程序级别的锁,这可以手动完成,也可以通过symfony 等组件完成。当您的脚本启动时,它应该在完成之前将自己锁定以防止其他运行。 可以在写入之前锁定整个表...事务无济于事,因为这只会隔离操作,但不会阻止其他人访问表。 桌子上的PRIMARY KEY 是什么?为完整起见,请提供SHOW CREATE TABLE table 有什么问题??肯定1650计数错了,但表没有错。你最终得到正好 1000 行。您没有提到是否涉及第二个连接。如果这是问题的一部分,您必须说明它是如何涉及的。 @RickJames 没有PRIMARY KEY,还有第二个连接。我几乎同时在两个不同的浏览器窗口中打开 deleteAndReInsert.php。 【参考方案1】:

使用事务+批量插入

我认为解决问题的正确方法是使用事务。我们要进行删除+批量插入,代码如下:

$pdo->beginTransaction();
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$sql = "INSERT INTO table (name, age) VALUES ".implode(', ',array_fill(0,999, '(:name, :age)'));
$query = $sth->prepare($sql);
$query->execute(array([ ':name' => 'Bob', 'age' => 34 ]));

$pdo->commit();

仅使用批量插入(不起作用)

为什么只做批量插入不能解决问题?想象以下场景:

    第一个脚本进行删除并删除前 1000 行。 ==> 将 1000 行变为 0。 第二个脚本尝试删除但没有行。 ==> 将 0 行变为 0。 第一个(或第二个)脚本进行 1000 个批量插入。 ==> 从 1000 行到 1000 行。 第二个(或第一个)脚本进行第二个 1000 批插入。 ==> 从 1000 行到 2000 行。

这就是为什么进程是异步的,所以第二个脚本可能会在第一个脚本完成插入之前读取表。

使用从表模拟锁(不推荐)

如果我们没有事务,我们将如何解决这个问题?我认为这是一个交叉练习。

这是一个典型的并发问题,其中有两个或多个进程修改相同的数据。为了解决这个问题,我建议你使用第二个辅助表来模拟Lock并控制对主表的并发访问。

CREATE TABLE `access_table` (
  `access` TINYINT(1) NOT NULL DEFAULT 1
)

在脚本中

// Here we control the concurrency
do
   $query = $st->prepare('UPDATE access_table SET access = 0 WHERE access = 1');
   $query ->execute();
   $count = $query ->rowCount();
   // You should put here a random sleep
while($count === 0);


//Here we know that only us we are modifying the table
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)

    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);



//And finally we open the table for other process
$query = $st->prepare('UPDATE access_table SET access = 1 WHERE access = 0');
$query ->execute();

您可以根据您的问题调整表格,例如,如果 INSERTS/DELETES 是按名称命名的,您可以使用 varchar(XX) 作为名称。

CREATE TABLE `access_table` (
  `name` VARCHAR(50) NOT NULL,
  `access` TINYINT(1) NOT NULL DEFAULT 1
)

在这种情况下

    第一个脚本将访问值更改为 0。 第二个脚本无法更改该值,因此它停留在循环中 第一个脚本进行 DELETES/INSERTS 第一个脚本将状态更改为 1 第二个脚本将访问值更改为 0 并破坏外观。 第二个脚本进行 DELETES/INSERTS 第二个脚本将状态更改为 1

这是因为更新是原子的,也就是说两个进程不能同时更新同一个日期,所以当第一个脚本更新值时,第二个脚本不能修改,这个动作是原子的。

希望对你有所帮助。

【讨论】:

如果“批量插入”是指一个有 1000 行的 INSERT,那么无论使用什么锁定机制,这都是原子的。请清楚您的“批量插入”的含义。 这是正确答案(使用事务),但是代码sn-p有错误。【参考方案2】:

其他解决方案的替代方法是在脚本启动时创建一个实际的锁定文件,并在运行前检查它是否存在。

while( file_exists("isrunning.lock") )
    sleep(1);


//create file isrunning.lock
$myfile = fopen("isrunning.lock", "w");

//deleteAndinsert code

//delete lock file when finished
fclose($myfile);
unlink("isrunning.lock");

【讨论】:

PHP 实际上有获取文件锁的内置函数 (php.net/manual/en/function.flock.php)。不建议滚动您自己的解决方案。【参考方案3】:

你必须锁定操作并且在插入结束之前不要释放它。

您可以在文件系统上使用文件,但正如@chris Hass 建议的那样,您可以像这样使用 symfony 的包:

安装 symfony 锁:

composer require symfony/lock

你应该包含作曲家的自动加载

require __DIR__.'/vendor/autoload.php';

然后在你的 deleteAndReInsert.php 中:

use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\SemaphoreStore;

//if you are on windows or for any reason this store(FlockStore) didnt work
// you can use another stores available here: https://symfony.com/doc/current/components/lock.html#available-stores 
$store = new FlockStore();
$factory = new LockFactory($store);
$lock = $factory->createLock('bob-recreation');
$lock->acquire(true)

$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)

    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);

$lock->release();

发生了什么

正如你所说,发生的事情是race condition:

如果两个并发进程正在访问一个共享资源, 这类似于 关键部分,可能需要用锁进行保护

【讨论】:

锁定数据库表而不是php文件更有意义吗? 如果第二个脚本不是用 PHP 编写的会怎样?如果它在其他计算机中?我觉得用代码来控制数据库访问并发不是一个好主意。 LockFactory 在幕后使用了什么 SQL? @Abilogos 我看到了链接,我认为你是对的,这就是我面临的问题。 @Abilogos 您的答案有效,但最简单的答案是用户进行交易。【参考方案4】:

计数是一个近似值

SHOW TABLE STATUS(以及许多类似的变体)仅提供行数的估计值。 (请说出您是如何获得“1650”的。)

准确的计数方法是

SELECT COUNT(*) FROM table;

进一步讨论

执行“事务锁定”的主要方法有两种。两者都可以防止其他连接干扰。

自动提交:

SET autocommit = ON;   -- probably this is the default
-- Now each SQL statement is a separate "transaction"

开始...提交

BEGIN;  -- (this is performed in a variety of ways by the db layer)
delete...
insert...
COMMIT;  --everything above either entire happens or is entirely ROLLBACK'd

性能:

DELETE --> TRUNCATE 批量插入(单个 INSERT 中的 1000 行) BEGIN...COMMIT LOAD DATA 而不是 INSERT

但是没有的性能技术会改变你遇到的问题——除了“巧合”。


为什么是 1650?

(或其他一些数字)InnoDB 的事务性质要求它挂在被删除或插入的行的先前副本上,直到COMMIT(无论是显式还是自动提交)。这会使数据库中出现可能消失的“行”。因此,任何尝试航位推算确切行数的尝试都是不切实际的。

这导致使用不同的技术来估计行数。它是这样的:表占用了这么多磁盘,我们估计平均行是这么多字节。将它们相除以获得行数。

这导致您关于删除未完成的理论。就任何 SQL 而言,Delete is 已完成。但是,临时保存的 1000 行副本尚未从表中彻底清除。因此,行数的计算不精确。


锁定?

锁定技术将“修复”1650。如果您不希望其他线程在运行 Delete+Insert 实验时插入/删除行,则需要锁定。为此,您应该使用锁定。

同时,如果您想要精确计数,必须使用COUNT(*)

【讨论】:

我使用的是 COUNT(*),所以 1650 的数字是准确的。【参考方案5】:

防止此问题发生的正确方法是什么?

这不是问题,是访问数据库上同一个表的两个页面的预期行为。

我应该将所有内容包装在事务中,还是应该进行 1 次批量插入调用而不是 1000 次单独插入?显然我可以实现这两种解决方案,但我很好奇哪一种可以保证防止这个问题。

除了将插入的数量限制为 n000 由您运行的页数之外,不会产生盲目的差异。


场景 1 - 什么都不做

有两个页面一个接一个或在相似的时间运行。这就是为什么您看到 1650 条记录的原因是由于执行方法中的隐式事务,允许其他进程(在您的情况下为页面)访问表中的数据。

Action Page a Page b Table Row count
1 Deletes all Bobs 0
... Insert a row 1
351 Insert a row Deletes all Bobs 0
352 Insert a row Insert a row 2
... Insert a row Insert a row 4
1001 Insert a row Insert a row 1298
1002 Insert a row 1299
... Insert a row ...
1352 Insert a row 1650

因此插入了 1650 个 Bob。


场景 2 - 使用显式事务(乐观) |行动 |第一个 | b页|表行数 |交易 | | ----- | ------ | ------ | --- | --- | | 1 |开始| | 0 | | | 2 |删除所有 Bob |开始| 0 | (a-d0)| | 3 |插入 1000 行 |删除所有 Bob | 0| (a-d0-i1000)(b-d1000) | | 4 |提交 |插入 1000 行 | 1000 | (b-d1000-i1000) | | 5 | |提交 | 1000 |


场景 3 - 添加锁定 |行动 |第一个 | b页|表行数 | | ----- | ------ | ------ | --- | | 1 | AQ锁| | 0 | | 2 |开始| | 0 | | 3 |删除所有 Bob | AQ锁| 0 | | 4 |插入 1000 行 |无锁| 0| | 5 |提交 |没有锁| 1000 | | 6 | | AQ锁| 1000 | | 6 | |开始| 1000 | | 6 | |删除所有 Bob | 1000 (0) | | 6 | |插入 1000 行 | 1000 (1000) | | 6 | |解锁 | 1000 |

【讨论】:

【参考方案6】:

如果有另一个实例,您可以检查服务器上的进程列表并阻止您的脚本执行。

【讨论】:

【参考方案7】:

你点击deleteAndReInsert.php两次,每个脚本有1001条命令,首先是删除所有name = Bob,剩下的就是再次插入1000次Bob。 所以你完全有 2002 个命令,并且你没有声明让 mysql 明白你想要同步执行它的东西,你的 2002 命令将并发运行,并会导致意想不到的结果。 (超过 1000 个插入 name= Bob)。这个过程可以这样描述:

->delete `name= bob` (clear count = 0)
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
....
->insert `name = bob`
->delete `name= bob` (the second time deleteAndReInsert.php 
hit deleted at 300 times insert `name = bob` of first
 time deleteAndReInsert.php -> clear count rows = 0)
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
....
-> insert `name = bob` (now it could be more than 1000 rows)

所以如果你想要结果是 1000 行。你必须让mysql明白:我想要同步运行的deleteAndReInsert.php,一步一步。并存档您可以执行以下解决方案之一:

    使用LOCK TABLE 语句锁定表并在完成时使用UNLOCK,这使得第二个脚本无法对表执行任何操作,除非第一个脚本完成。 将所有内容包装在事务BEGIN COMMIT 中,然后mysql 将作为原子操作运行。 (好) 通过 redis (Redlock)、file .. 模拟 LOCK 以使您的操作同步运行 (Good)

希望能帮助你解决问题。

【讨论】:

【参考方案8】:

您要做的是将LOCK TABLE ... WRITE 作为您工作的第一个声明,并将RELEASE TABLES 作为其最后一个声明。

那么这千行将被删除,然后插入,然后删除,然后再次插入。

但整个过程对我来说就像一个 XY 问题。您真正需要做什么?

因为我经常需要执行您所描述的类似操作(例如“刷新”一些摘要),而在这种情况下,我认为最好的方法是,两者都不 LOCK 也不 DELETE/INSERT,而是

INSERT INTO table
    ON DUPLICATE KEY UPDATE ...

就我而言,如果我只需要添加刷新记录,就足够了。

否则,我通常会添加一个“时间”字段,以便我识别刷新周期中“遗漏”的所有记录;那些——而且只有那些——在完成后被删除。

例如,假设我需要用复杂的 PHP 计算来计算许多客户的最大财务风险,然后插入到表格中以方便使用。每天晚上,每个客户都会刷新其值,然后在第二天使用“缓存”表。截断表格并重新插入所有内容很痛苦。

相反,我计算所有值并构建一个非常大的多个 INSERT 查询(如果需要,我可以将其拆分为 X 个更小的多个查询):

SELECT barrier:=NOW();
INSERT INTO `financial_exposures` ( ..., amount, customer_id, last_update )
    VALUES 
    ( ..., 172035.12, 12345, NOW()),
    ( ..., 123456.78, 12346, NOW()),
    ...
    ( ..., 450111.00, 99999, NOW())
     ON DUPLICATE KEY UPDATE 
         amount=VALUES(amount),
         last_update=VALUES(last_update);
 DELETE FROM financial_exposures WHERE last_update < @barrier;

插入新客户,更新旧客户,除非他们的值不变(在这种情况下,MySQL 会跳过更新,节省时间),并且在每一时刻,始终存在一条记录 - 更新之前的记录,或者更新前的记录更新后。被移除的客户将在最后一步被移除。

当您有一个需要经常使用更新的表时,这会更好。您可以添加不带锁的事务(INSERT 之前的SET autocommit = 0COMMIT WORK 之后的DELETE),以确保所有客户端都可以看到整个更新,就像它立即发生一样。

【讨论】:

这是一种有趣的技术,但它依赖于唯一的数据库约束。我认为这样的解决方案最适合大多数现实生活中的问题。问题中的问题有点棘手,因为没有数据库约束,添加一个没有意义。【参考方案9】:

@Pericodes 的回答是正确的,但是代码 sn-p 有错误。

您可以通过将代码包装在事务中来避免重复(不需要批量插入来停止重复)。

最好使用 1 个批量插入而不是 1000 个单独的插入,即使这不是必需的。

您可以通过运行此代码两次(几乎同时)进行测试,表格最终正好有 1000 条记录。

<?
$pdo->beginTransaction();

$query = $pdo->prepare("DELETE FROM t1 WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO t1 (name, age) VALUES (:name,:age)");
for ($i = 0; $i < 100; $i++)

    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);


$pdo->commit();

一些答案​​提到了锁(db 级和代码级),但对于这个问题来说,这些不是必需的,而且在 imo 中是多余的。

【讨论】:

以上是关于事务与批处理查询以避免重复的 MySQL 插入的主要内容,如果未能解决你的问题,请参考以下文章

插入批处理,如果在Codeigniter 3 HMVC中有重复的密钥更新

redis事务与管道区别

Mysql 批量插入事务唯一键重复处理

Mysql 批量插入事务唯一键重复处理

具有多个多对多关系的休眠批处理事务

Java 中 MySQL 插入语句的性能:批处理模式准备语句与具有多个值的单个插入