使用多线程处理一组数据库记录的选项?

Posted

技术标签:

【中文标题】使用多线程处理一组数据库记录的选项?【英文标题】:Options to use multithreading to process a group of database records? 【发布时间】:2012-06-06 19:10:50 【问题描述】:

我有一个数据库表,其中包含一些要处理的记录。该表有一个标志列,表示以下状态值。 1 - 准备好处理,2 - 成功处理,3 - 处理失败。

.net 代码(重复过程 - 控制台/服务)将获取准备处理的记录列表,并循环通过它们并尝试处理它们(不是很长),根据成功或失败更新状态.

为了获得更好的性能,我想为此进程启用多线程。我正在考虑生成 6 个线程,每个线程抓取一个子集。

显然我想避免让不同的线程处理相同的记录。我不想在数据库中有一个“正在处理”标志来处理线程崩溃导致记录挂起的情况。

我看到这样做的唯一方法是获取可用记录的完整列表并为每个线程分配一个组(可能是 id)。如果一个单独的线程失败了,它的未处理记录将在下次进程运行时被拾取。

在将组分配给线程之前,是否有其他方法可以划分组?

【问题讨论】:

你的表中有标识列吗? 您真的不应该将数据库用作队列。 en.wikipedia.org/wiki/Database-as-IPC @Oded:我同意事务队列是一个更好的主意,但是您链接到的反模式说使用数据库是一种反模式,例如可以使用套接字。正确的想法,虽然不是一个完美的参考。 @Oded,***链接和那里发布的文章都没有为 db 作为队列提供真正的替代方案。编写“改为使用套接字”非常简单,但这意味着什么?您应该如何处理事务(成功或失败/重试),例如,使用套接字?人们没有在数据库中正确排队/处理项目的事实(正如那里的链接所解释的那样)并不意味着这个想法本身就是反模式,尤其是在 99% 的情况下,您希望已处理的项目保留在那里进行归档。跨度> @YavgenyP:确实,如果我没记错的话,事务模式下的 MSMQ 使用 SQL 服务器作为持久性机制 :-) 【参考方案1】:

实现此要求最直接的方法是使用任务并行库的

Parallel.ForEach(或Parallel.For)。

允许它管理单个工作线程。

根据经验,我会推荐以下内容:

具有附加状态“处理中” 在数据库中有一个列指示何时提取记录以进行处理,并有一个定期运行的清理任务/进程查找“处理”时间过长的记录(将状态重置为“准备处理” )。 即使您不希望它,“正在处理”对于崩溃恢复方案来说也是必不可少的(除非您可以容忍同一记录被处理两次)。

或者

考虑使用事务队列(想到 MSMQ 或 Rabbit MQ)。它们针对这个问题进行了优化。

这将是我的明确选择,因为我已经大规模完成了这两项工作。

优化

如果从数据库中检索数据需要花费大量时间,您可以考虑使用生产者/消费者模式,使用BlockingCollection 实现该模式非常简单。该模式允许一个线程(生产者)使用要处理的数据库记录填充队列,并允许多个其他线程(消费者)处理该队列中的项目。

一种新的选择

鉴于在记录被认为完成之前有几个处理步骤会触及记录,请查看Windows Workflow Foundation 作为可能的替代方案。

【讨论】:

崩溃恢复需要“正在处理”是什么意思。确实不是,如果记录状态没有更新为成功或失败,会再次处理。只有设置成功或失败后,才被视为已处理。在此之前,您可以一次又一次地播放它。我不希望有另一个进程希望重置“正在处理”标志,这可能并不完全准确,而且我有时也无法承受该进程重置标志以再次处理的延迟。 我也同意 MSMQ 是一个不错的选择。但是我在这里没有提到的一件事是,记录在被标记为成功或失败之前经历了许多阶段(由不同的进程处理),例如,初始化,移动到状态A,B,然后成功或失败。不确定队列是否仍然适用于这些场景。当涉及到这样的工作流程时,我在某处读到,db 更好。 @AlexJ:“只有设置成功或失败后,才视为已处理”表示您不需要额外的状态。许多系统不允许在没有一些回滚逻辑的情况下重新处理部分处理的记录。既然你可以容忍这一点,那么在这方面你很高兴。我会选择 Parallel.ForEach() 然后引入 BlockingQueue,当且仅当您花费大量时间从数据库中获取记录才能处理它们。 感谢您的所有解释。我想我仍然可以使用阻塞队列,因为我可以一次将所有“准备好的”放入队列并从阻塞集合中获得 Parallel.ForEach 进程。如果我错了,请纠正。 我也很想知道 Workflow 是如何做到这一点的。我做了一些研究,但不太明白如何使用它。完全是新手。还在找。。【参考方案2】:

我记得做了类似你描述的事情......一个线程不时检查数据库中是否有需要处理的新内容。它只会加载新的 id,所以如果在 x 时读取的最后一个 id 为 1000,则在 x+1 时将从 id 1001 读取。

它读取的所有内容都进入线程安全队列。将项目添加到此队列时,您会通知工作线程(可能使用自动重置事件,或在此处生成线程)。每个线程将一次从这个线程安全队列中读取一项,直到队列被清空。

您不应该在 foreach 线程工作之前分配(除非您知道 foreach 文件的进程花费相同的时间)。如果一个线程完成了工作,那么它应该从剩下的其他线程那里承担负载。使用这个线程安全队列,您可以确保这一点。

【讨论】:

如果使用内存队列,则无法从崩溃场景中恢复,除非可以多次处理同一记录。 确实如此,但如果他不能改变这三个状态呢? 啊,但他可以。他只是说他不想因为线程崩溃恢复,但实际上他需要启用恢复(假设不允许重播记录)。 好的。我假设另一个应用程序正在插入这些值,他不得不认为它们是理所当然的。然后就没事了:)【参考方案3】:

这是一种不依赖/使用额外的数据库列(但请参阅#4)或要求进程内队列的方法。 这种方法的前提是基于一些一致的值在工作人员之间“分片”记录,就像分布式缓存一样。

这是我的假设:

    重新处理不会导致不必要的副作用;最多“浪费”一些工作。 线程数在启动时是固定的。这不是必需的,但它确实简化了实现,并允许我在下面的简单描述中跳过暂时的细节。 只有一个“工作进程”(但请参见 #1)控制“工作线程”。这简化了如何在工作人员之间拆分记录的处理。 有一些[不可变的]“ID”列“分布良好”。这是必需的,这样搜索工作者才能获得大致相同的工作量。 只要“最终完成”,就可以“乱序”完成工作。此外,工作人员可能并不总是“100%”运行,因为每个工作人员都有效地在不同的队列上工作。

[0, thread_count) 为每个线程分配一个唯一的bucket 值。如果一个线程死亡/重新启动,它将占用与它腾出的相同的桶。

然后,每次线程需要一条新记录时,它都会从数据库中获取:

SELECT *
FROM record
WHERE state = 'unprocessed'
AND (id % $thread_count) = $bucket
ORDER BY date

当然,对于批量读取“此线程任务”并将它们存储在本地可能还有其他假设。然而,本地队列将是每个线程(因此在新线程启动时重新加载),因此它只会处理与给定bucket 关联的记录。

当线程完成处理记录时,应使用适当的隔离级别和/或乐观并发将记录标记为已处理,然后继续处理下一条记录。

【讨论】:

为什么要自己管理线程? TPL 在这方面做得非常好......另外,这需要多次往返数据库,否则就可以这样做。 @EricJ。我试图避免指定线程的实现。这种方法的想法是根据一些一致的值跨工作线程“分片”记录,就像分布式缓存一样。 但是您指定实现将不会是 Parallel.For(Each),因为两者都会自动管理线程生命周期并且不会混合您的第二个假设。跨度> @EricJ。不,这是一个错误的论点。我概述了一些假设。在给定的约束内工作或重新定义/更改它们取决于实现。 最后一句话...您的第二个假设使得无法使用 Parallel.For(Each),并且确实牺牲了利用专门为此类问题设计的强大而简单的工具的能力。我就这样吧:-)

以上是关于使用多线程处理一组数据库记录的选项?的主要内容,如果未能解决你的问题,请参考以下文章

使用多线程 + 多处理的 Python 日志记录

C# 多线程应用程序和 SQL 连接帮助

winform 利用 多线程 处理窗体假死,利用 Invoke BeginInvoke 处理子线程调用 UI 控件报错的问题

C#多线程处理

多线程

使用阻塞式队列处理大数据