事务与批处理查询以避免重复的 MySQL 插入
Posted
技术标签:
【中文标题】事务与批处理查询以避免重复的 MySQL 插入【英文标题】:Transaction vs Batch Query to Avoid Duplicate MySQL Inserts 【发布时间】:2021-10-07 02:35:45 【问题描述】:我有一个 php
脚本 (deleteAndReInsert.php
),它会删除 name = 'Bob'
所在的所有行,然后使用 name = 'Bob'
插入 1000 个新行。这可以正常工作,最初为空的表最终有 1000 行,如预期的那样。
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);
$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)
$query->execute([ 'name' => 'Bob', 'age' => 34 ]);
问题是如果我运行deleteAndReInsert.php
两次(几乎在同一时间),最终表包含超过 1000 行。
似乎正在发生的事情是第一次运行的 DELETE
查询完成,然后许多(但不是全部)1000 INSERTS
被调用。
然后第二个 DELETE
查询开始并在前 1000 个 INSERTS
完成之前完成(比如 1000 个 INSERTS
中的 350 个完成)。现在第二个 1000 INSERTS
运行,我们最终得到 1650 行而不是 1000 总行,因为在第二个 DELETE
被调用后仍有 1000 - 350 = 650 INSERTS
剩余。
防止此问题发生的正确方法是什么?我应该将所有内容包装在事务中,还是应该进行 1 次批量插入调用而不是 1000 次单独插入?显然我可以实现这两种解决方案,但我很好奇哪一种可以保证防止这个问题。
【问题讨论】:
一个事务可能对你没有帮助,因为你有多个插入,并且没有真正的方法来判断哪个是对还是错。相反,我会研究应用程序级别的锁,这可以手动完成,也可以通过symfony 等组件完成。当您的脚本启动时,它应该在完成之前将自己锁定以防止其他运行。 您可以在写入之前锁定整个表...事务无济于事,因为这只会隔离操作,但不会阻止其他人访问表。 桌子上的PRIMARY KEY
是什么?为完整起见,请提供SHOW CREATE TABLE table
。
有什么问题??肯定1650计数错了,但表没有错。你最终得到正好 1000 行。您没有提到是否涉及第二个连接。如果这是问题的一部分,您必须说明它是如何涉及的。
@RickJames 没有PRIMARY KEY
,还有第二个连接。我几乎同时在两个不同的浏览器窗口中打开 deleteAndReInsert.php。
【参考方案1】:
使用事务+批量插入
我认为解决问题的正确方法是使用事务。我们要进行删除+批量插入,代码如下:
$pdo->beginTransaction();
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);
$sql = "INSERT INTO table (name, age) VALUES ".implode(', ',array_fill(0,999, '(:name, :age)'));
$query = $sth->prepare($sql);
$query->execute(array([ ':name' => 'Bob', 'age' => 34 ]));
$pdo->commit();
仅使用批量插入(不起作用)
为什么只做批量插入不能解决问题?想象以下场景:
-
第一个脚本进行删除并删除前 1000 行。 ==> 将 1000 行变为 0。
第二个脚本尝试删除但没有行。 ==> 将 0 行变为 0。
第一个(或第二个)脚本进行 1000 个批量插入。 ==> 从 1000 行到 1000 行。
第二个(或第一个)脚本进行第二个 1000 批插入。 ==> 从 1000 行到 2000 行。
这就是为什么进程是异步的,所以第二个脚本可能会在第一个脚本完成插入之前读取表。
使用从表模拟锁(不推荐)
如果我们没有事务,我们将如何解决这个问题?我认为这是一个交叉练习。
这是一个典型的并发问题,其中有两个或多个进程修改相同的数据。为了解决这个问题,我建议你使用第二个辅助表来模拟Lock并控制对主表的并发访问。
CREATE TABLE `access_table` (
`access` TINYINT(1) NOT NULL DEFAULT 1
)
在脚本中
// Here we control the concurrency
do
$query = $st->prepare('UPDATE access_table SET access = 0 WHERE access = 1');
$query ->execute();
$count = $query ->rowCount();
// You should put here a random sleep
while($count === 0);
//Here we know that only us we are modifying the table
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);
$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)
$query->execute([ 'name' => 'Bob', 'age' => 34 ]);
//And finally we open the table for other process
$query = $st->prepare('UPDATE access_table SET access = 1 WHERE access = 0');
$query ->execute();
您可以根据您的问题调整表格,例如,如果 INSERTS/DELETES 是按名称命名的,您可以使用 varchar(XX)
作为名称。
CREATE TABLE `access_table` (
`name` VARCHAR(50) NOT NULL,
`access` TINYINT(1) NOT NULL DEFAULT 1
)
在这种情况下
-
第一个脚本将访问值更改为 0。
第二个脚本无法更改该值,因此它停留在循环中
第一个脚本进行 DELETES/INSERTS
第一个脚本将状态更改为 1
第二个脚本将访问值更改为 0 并破坏外观。
第二个脚本进行 DELETES/INSERTS
第二个脚本将状态更改为 1
这是因为更新是原子的,也就是说两个进程不能同时更新同一个日期,所以当第一个脚本更新值时,第二个脚本不能修改,这个动作是原子的。
希望对你有所帮助。
【讨论】:
如果“批量插入”是指一个有 1000 行的INSERT
,那么无论使用什么锁定机制,这都是原子的。请清楚您的“批量插入”的含义。
这是正确答案(使用事务),但是代码sn-p有错误。【参考方案2】:
其他解决方案的替代方法是在脚本启动时创建一个实际的锁定文件,并在运行前检查它是否存在。
while( file_exists("isrunning.lock") )
sleep(1);
//create file isrunning.lock
$myfile = fopen("isrunning.lock", "w");
//deleteAndinsert code
//delete lock file when finished
fclose($myfile);
unlink("isrunning.lock");
【讨论】:
PHP 实际上有获取文件锁的内置函数 (php.net/manual/en/function.flock.php)。不建议滚动您自己的解决方案。【参考方案3】:你必须锁定操作并且在插入结束之前不要释放它。
您可以在文件系统上使用文件,但正如@chris Hass 建议的那样,您可以像这样使用 symfony 的包:
安装 symfony 锁:
composer require symfony/lock
你应该包含作曲家的自动加载
require __DIR__.'/vendor/autoload.php';
然后在你的 deleteAndReInsert.php 中:
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\SemaphoreStore;
//if you are on windows or for any reason this store(FlockStore) didnt work
// you can use another stores available here: https://symfony.com/doc/current/components/lock.html#available-stores
$store = new FlockStore();
$factory = new LockFactory($store);
$lock = $factory->createLock('bob-recreation');
$lock->acquire(true)
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);
$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)
$query->execute([ 'name' => 'Bob', 'age' => 34 ]);
$lock->release();
发生了什么
正如你所说,发生的事情是race condition:
如果两个并发进程正在访问一个共享资源, 这类似于 关键部分,可能需要用锁进行保护
【讨论】:
锁定数据库表而不是php文件更有意义吗? 如果第二个脚本不是用 PHP 编写的会怎样?如果它在其他计算机中?我觉得用代码来控制数据库访问并发不是一个好主意。LockFactory
在幕后使用了什么 SQL?
@Abilogos 我看到了链接,我认为你是对的,这就是我面临的问题。
@Abilogos 您的答案有效,但最简单的答案是用户进行交易。【参考方案4】:
计数是一个近似值
SHOW TABLE STATUS
(以及许多类似的变体)仅提供行数的估计值。 (请说出您是如何获得“1650”的。)
准确的计数方法是
SELECT COUNT(*) FROM table;
进一步讨论
执行“事务锁定”的主要方法有两种。两者都可以防止其他连接干扰。
自动提交:
SET autocommit = ON; -- probably this is the default
-- Now each SQL statement is a separate "transaction"
开始...提交
BEGIN; -- (this is performed in a variety of ways by the db layer)
delete...
insert...
COMMIT; --everything above either entire happens or is entirely ROLLBACK'd
性能:
DELETE
--> TRUNCATE
批量插入(单个 INSERT
中的 1000 行)
BEGIN...COMMIT
LOAD DATA
而不是 INSERT
但是没有的性能技术会改变你遇到的问题——除了“巧合”。
为什么是 1650?
(或其他一些数字)InnoDB 的事务性质要求它挂在被删除或插入的行的先前副本上,直到COMMIT
(无论是显式还是自动提交)。这会使数据库中出现可能消失的“行”。因此,任何尝试航位推算确切行数的尝试都是不切实际的。
这导致使用不同的技术来估计行数。它是这样的:表占用了这么多磁盘,我们估计平均行是这么多字节。将它们相除以获得行数。
这导致您关于删除未完成的理论。就任何 SQL 而言,Delete is 已完成。但是,临时保存的 1000 行副本尚未从表中彻底清除。因此,行数的计算不精确。
锁定?
否锁定技术将“修复”1650。如果您不希望其他线程在运行 Delete+Insert 实验时插入/删除行,则需要锁定。为此,您应该使用锁定。
同时,如果您想要精确计数,必须使用COUNT(*)
。
【讨论】:
我使用的是 COUNT(*),所以 1650 的数字是准确的。【参考方案5】:防止此问题发生的正确方法是什么?
这不是问题,是访问数据库上同一个表的两个页面的预期行为。
我应该将所有内容包装在事务中,还是应该进行 1 次批量插入调用而不是 1000 次单独插入?显然我可以实现这两种解决方案,但我很好奇哪一种可以保证防止这个问题。
除了将插入的数量限制为 n000 由您运行的页数之外,不会产生盲目的差异。
场景 1 - 什么都不做
有两个页面一个接一个或在相似的时间运行。这就是为什么您看到 1650 条记录的原因是由于执行方法中的隐式事务,允许其他进程(在您的情况下为页面)访问表中的数据。
Action | Page a | Page b | Table Row count |
---|---|---|---|
1 | Deletes all Bobs | 0 | |
... | Insert a row | 1 | |
351 | Insert a row | Deletes all Bobs | 0 |
352 | Insert a row | Insert a row | 2 |
... | Insert a row | Insert a row | 4 |
1001 | Insert a row | Insert a row | 1298 |
1002 | Insert a row | 1299 | |
... | Insert a row | ... | |
1352 | Insert a row | 1650 |
因此插入了 1650 个 Bob。
场景 2 - 使用显式事务(乐观) |行动 |第一个 | b页|表行数 |交易 | | ----- | ------ | ------ | --- | --- | | 1 |开始| | 0 | | | 2 |删除所有 Bob |开始| 0 | (a-d0)| | 3 |插入 1000 行 |删除所有 Bob | 0| (a-d0-i1000)(b-d1000) | | 4 |提交 |插入 1000 行 | 1000 | (b-d1000-i1000) | | 5 | |提交 | 1000 |
场景 3 - 添加锁定 |行动 |第一个 | b页|表行数 | | ----- | ------ | ------ | --- | | 1 | AQ锁| | 0 | | 2 |开始| | 0 | | 3 |删除所有 Bob | AQ锁| 0 | | 4 |插入 1000 行 |无锁| 0| | 5 |提交 |没有锁| 1000 | | 6 | | AQ锁| 1000 | | 6 | |开始| 1000 | | 6 | |删除所有 Bob | 1000 (0) | | 6 | |插入 1000 行 | 1000 (1000) | | 6 | |解锁 | 1000 |
【讨论】:
【参考方案6】:如果有另一个实例,您可以检查服务器上的进程列表并阻止您的脚本执行。
【讨论】:
【参考方案7】:你点击deleteAndReInsert.php
两次,每个脚本有1001条命令,首先是删除所有name = Bob
,剩下的就是再次插入1000次Bob。
所以你完全有 2002 个命令,并且你没有声明让 mysql 明白你想要同步执行它的东西,你的 2002 命令将并发运行,并会导致意想不到的结果。 (超过 1000 个插入 name= Bob
)。这个过程可以这样描述:
->delete `name= bob` (clear count = 0)
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
....
->insert `name = bob`
->delete `name= bob` (the second time deleteAndReInsert.php
hit deleted at 300 times insert `name = bob` of first
time deleteAndReInsert.php -> clear count rows = 0)
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
....
-> insert `name = bob` (now it could be more than 1000 rows)
所以如果你想要结果是 1000 行。你必须让mysql明白:我想要同步运行的deleteAndReInsert.php,一步一步。并存档您可以执行以下解决方案之一:
-
使用
LOCK TABLE
语句锁定表并在完成时使用UNLOCK
,这使得第二个脚本无法对表执行任何操作,除非第一个脚本完成。
将所有内容包装在事务BEGIN COMMIT
中,然后mysql 将作为原子操作运行。 (好)
通过 redis (Redlock)、file .. 模拟 LOCK
以使您的操作同步运行 (Good)
希望能帮助你解决问题。
【讨论】:
【参考方案8】:您要做的是将LOCK TABLE ... WRITE
作为您工作的第一个声明,并将RELEASE TABLES
作为其最后一个声明。
那么这千行将被删除,然后插入,然后删除,然后再次插入。
但整个过程对我来说就像一个 XY 问题。您真正需要做什么?
因为我经常需要执行您所描述的类似操作(例如“刷新”一些摘要),而在这种情况下,我认为最好的方法是,两者都不 LOCK 也不 DELETE/INSERT,而是
INSERT INTO table
ON DUPLICATE KEY UPDATE ...
就我而言,如果我只需要添加或刷新记录,就足够了。
否则,我通常会添加一个“时间”字段,以便我识别刷新周期中“遗漏”的所有记录;那些——而且只有那些——在完成后被删除。
例如,假设我需要用复杂的 PHP 计算来计算许多客户的最大财务风险,然后插入到表格中以方便使用。每天晚上,每个客户都会刷新其值,然后在第二天使用“缓存”表。截断表格并重新插入所有内容很痛苦。
相反,我计算所有值并构建一个非常大的多个 INSERT 查询(如果需要,我可以将其拆分为 X 个更小的多个查询):
SELECT barrier:=NOW();
INSERT INTO `financial_exposures` ( ..., amount, customer_id, last_update )
VALUES
( ..., 172035.12, 12345, NOW()),
( ..., 123456.78, 12346, NOW()),
...
( ..., 450111.00, 99999, NOW())
ON DUPLICATE KEY UPDATE
amount=VALUES(amount),
last_update=VALUES(last_update);
DELETE FROM financial_exposures WHERE last_update < @barrier;
插入新客户,更新旧客户,除非他们的值不变(在这种情况下,MySQL 会跳过更新,节省时间),并且在每一时刻,始终存在一条记录 - 更新之前的记录,或者更新前的记录更新后。被移除的客户将在最后一步被移除。
当您有一个需要经常使用和更新的表时,这会更好。您可以添加不带锁的事务(INSERT
之前的SET autocommit = 0
,COMMIT WORK
之后的DELETE
),以确保所有客户端都可以看到整个更新,就像它立即发生一样。
【讨论】:
这是一种有趣的技术,但它依赖于唯一的数据库约束。我认为这样的解决方案最适合大多数现实生活中的问题。问题中的问题有点棘手,因为没有数据库约束,添加一个没有意义。【参考方案9】:@Pericodes 的回答是正确的,但是代码 sn-p 有错误。
您可以通过将代码包装在事务中来避免重复(不需要批量插入来停止重复)。
最好使用 1 个批量插入而不是 1000 个单独的插入,即使这不是必需的。
您可以通过运行此代码两次(几乎同时)进行测试,表格最终正好有 1000 条记录。
<?
$pdo->beginTransaction();
$query = $pdo->prepare("DELETE FROM t1 WHERE name=?");
$query->execute(['Bob']);
$query = $pdo->prepare("INSERT INTO t1 (name, age) VALUES (:name,:age)");
for ($i = 0; $i < 100; $i++)
$query->execute([ 'name' => 'Bob', 'age' => 34 ]);
$pdo->commit();
一些答案提到了锁(db 级和代码级),但对于这个问题来说,这些不是必需的,而且在 imo 中是多余的。
【讨论】:
以上是关于事务与批处理查询以避免重复的 MySQL 插入的主要内容,如果未能解决你的问题,请参考以下文章