将数据库表用作作业队列(a.k.a批处理队列或消息队列)的最佳方法
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将数据库表用作作业队列(a.k.a批处理队列或消息队列)的最佳方法相关的知识,希望对你有一定的参考价值。
我有一个包含~50K行的数据库表,每行代表一个需要完成的工作。我有一个程序从数据库中提取作业,完成工作并将结果放回到数据库中。 (这个系统现在正在运行)
现在我想允许多个处理任务来完成工作,但要确保没有任务完成两次(作为性能问题而不是这将导致其他问题)。因为访问是通过sproce进行的,我现在的目的是用看起来像这样的东西替换所说的sproce
update tbl set owner=connection_id() where avalable and owner is null limit 1;
select stuff from tbl where owner = connection_id();
BTW;工作人员的任务可能会在获得工作和提交结果之间失去联系。此外,我不认为DB甚至会接近瓶颈,除非我把那部分搞砸了(每分钟约5个工作)
这有什么问题吗?有一个更好的方法吗?
注意:"Database as an IPC anti-pattern"在这里只是略微适用,因为1)我没有做IPC(没有生成行的过程,它们现在都已存在)和2)针对该反模式描述的主要抱怨是它的结果当进程等待消息时,在DB上不需要的负载(在我的情况下,如果没有消息,一切都可以在一切都完成时关闭)
这是我过去成功使用的内容:
MsgQueue表模式
MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable
您的消息类型是您所期望的 - 符合插入过程和进程(读取)之间的合同的消息,使用XML或您的其他表示形式构建(在某些情况下,JSON会很方便,因为实例)。
然后可以插入0到n进程,0到n进程可以读取和处理消息。每个读取进程通常处理单个消息类型。可以运行多个进程类型实例以进行负载平衡。
读取器拉出一条消息,并在其上工作时将状态更改为“A”。完成后,它将状态更改为“C”完成。它可以删除或不删除消息,具体取决于您是否要保留审计跟踪。 State ='N'的消息是以MsgType / Timestamp顺序提取的,因此在MsgType + State + CreateTime上有一个索引。
变化: 陈述“E”恐怖。 Reader进程代码列。 状态转换的时间戳。
这提供了一个很好的,可扩展的,可见的,简单的机制,用于执行您正在描述的许多事情。如果您对数据库有基本的了解,那么它非常简单且可扩展。
评论代码:
CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) )
AS
DECLARE @MsgId INT
BEGIN TRAN
SELECT TOP 1 @MsgId = MsgId
FROM MsgQueue
WHERE MessageType = @pMessageType AND State = 'N'
ORDER BY CreateTime
IF @MsgId IS NOT NULL
BEGIN
UPDATE MsgQueue
SET State = 'A'
WHERE MsgId = @MsgId
SELECT MsgId, Msg
FROM MsgQueue
WHERE MsgId = @MsgId
END
ELSE
BEGIN
SELECT MsgId = NULL, Msg = NULL
END
COMMIT TRAN
在关系数据库系统中实现作业队列的最佳方法是使用SKIP LOCKED。
SKIP LOCKED是一种锁定获取选项,适用于读取/共享(FOR SHARE)或写入/独占(FOR UPDATE)锁定,现在得到广泛支持:
- Oracle 10g及更高版本
- PostgreSQL 9.5及更高版本
- SQL Server 2005及更高版本
- mysql 8.0及更高版本
现在,考虑我们有以下用作作业队列的post
表:
CREATE TABLE post (
id int8 NOT NULL,
body varchar(255),
status int4,
title varchar(255),
PRIMARY KEY (id)
)
status
列用作Enum,具有PENDING(0),APPROVED(1)或SPAM(2)的值。
如果我们有多个并发用户试图调整post
记录,我们需要一种方法来协调他们的努力,以避免让两个主持人审查相同的post
行。
所以,SKIP LOCKED正是我们所需要的。如果两个并发用户Alice和Bob执行以下SELECT查询,这些查询将独占锁定帖子记录,同时还添加SKIP LOCKED选项:
[Alice]:
SELECT
p.id AS id1_0_,
p.body AS body2_0_,
p.status AS status3_0_,
p.title AS title4_0_
FROM
post p
WHERE
p.status = 0
ORDER BY
p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
[Bob]:
SELECT
p.id AS id1_0_,
p.body AS body2_0_,
p.status AS status3_0_,
p.title AS title4_0_
FROM
post p
WHERE
p.status = 0
ORDER BY
p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
我们可以看到Alice可以选择前两个条目,而Bob选择接下来的两个条目。如果没有SKIP LOCKED,Bob锁定获取请求将被阻止,直到Alice释放前2条记录的锁定。
有关SKIP LOCKED的更多详情,请查看this article。
正如可能的技术变化,您可能会考虑使用MSMQ或类似的东西。
您的每个作业/线程都可以查询消息队列以查看是否有新作业可用。因为读取消息的行为将其从堆栈中删除,所以确保只有一个作业/线程可以获取消息。
当然,这是假设您正在使用Microsoft平台。
如果不拥有owner = null,则应将其设置为伪无人记录。搜索null不会限制索引,最终可能会进行表扫描。 (这是为oracle,SQL服务器可能会有所不同)
您正在尝试实现de“Database as IPC”反模式。查一下,了解为什么要考虑重新设计软件。
以上是关于将数据库表用作作业队列(a.k.a批处理队列或消息队列)的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章