将数据库表用作作业队列(a.k.a批处理队列或消息队列)的最佳方法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将数据库表用作作业队列(a.k.a批处理队列或消息队列)的最佳方法相关的知识,希望对你有一定的参考价值。

我有一个包含~50K行的数据库表,每行代表一个需要完成的工作。我有一个程序从数据库中提取作业,完成工作并将结果放回到数据库中。 (这个系统现在正在运行)

现在我想允许多个处理任务来完成工作,但要确保没有任务完成两次(作为性能问题而不是这将导致其他问题)。因为访问是通过sproce进行的,我现在的目的是用看起来像这样的东西替换所说的sproce

update tbl set owner=connection_id() where avalable and owner is null limit 1;
select stuff from tbl where owner = connection_id();

BTW;工作人员的任务可能会在获得工作和提交结果之间失去联系。此外,我不认为DB甚至会接近瓶颈,除非我把那部分搞砸了(每分钟约5个工作)

这有什么问题吗?有一个更好的方法吗?

注意:"Database as an IPC anti-pattern"在这里只是略微适用,因为1)我没有做IPC(没有生成行的过程,它们现在都已存在)和2)针对该反模式描述的主要抱怨是它的结果当进程等待消息时,在DB上不需要的负载(在我的情况下,如果没有消息,一切都可以在一切都完成时关闭)

答案

这是我过去成功使用的内容:

MsgQueue表模式

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

您的消息类型是您所期望的 - 符合插入过程和进程(读取)之间的合同的消息,使用XML或您的其他表示形式构建(在某些情况下,JSON会很方便,因为实例)。

然后可以插入0到n进程,0到n进程可以读取和处理消息。每个读取进程通常处理单个消息类型。可以运行多个进程类型实例以进行负载平衡。

读取器拉出一条消息,并在其上工作时将状态更改为“A”。完成后,它将状态更改为“C”完成。它可以删除或不删除消息,具体取决于您是否要保留审计跟踪。 State ='N'的消息是以MsgType / Timestamp顺序提取的,因此在MsgType + State + CreateTime上有一个索引。

变化: 陈述“E”恐怖。 Reader进程代码列。 状态转换的时间戳

这提供了一个很好的,可扩展的,可见的,简单的机制,用于执行您正在描述的许多事情。如果您对数据库有基本的了解,那么它非常简单且可扩展。


评论代码:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN
另一答案

在关系数据库系统中实现作业队列的最佳方法是使用SKIP LOCKED

SKIP LOCKED是一种锁定获取选项,适用于读取/共享(FOR SHARE)或写入/独占(FOR UPDATE)锁定,现在得到广泛支持:

  • Oracle 10g及更高版本
  • PostgreSQL 9.5及更高版本
  • SQL Server 2005及更高版本
  • mysql 8.0及更高版本

现在,考虑我们有以下用作作业队列的post表:

CREATE TABLE post (
    id int8 NOT NULL,
    body varchar(255),
    status int4,
    title varchar(255),
    PRIMARY KEY (id)
)

status列用作Enum,具有PENDING(0),APPROVED(1)或SPAM(2)的值。

如果我们有多个并发用户试图调整post记录,我们需要一种方法来协调他们的努力,以避免让两个主持人审查相同的post行。

所以,SKIP LOCKED正是我们所需要的。如果两个并发用户Alice和Bob执行以下SELECT查询,这些查询将独占锁定帖子记录,同时还添加SKIP LOCKED选项:

    [Alice]:
    SELECT
        p.id AS id1_0_,

    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

我们可以看到Alice可以选择前两个条目,而Bob选择接下来的两个条目。如果没有SKIP LOCKED,Bob锁定获取请求将被阻止,直到Alice释放前2条记录的锁定。

有关SKIP LOCKED的更多详情,请查看this article

另一答案

正如可能的技术变化,您可能会考虑使用MSMQ或类似的东西。

您的每个作业/线程都可以查询消息队列以查看是否有新作业可用。因为读取消息的行为将其从堆栈中删除,所以确保只有一个作业/线程可以获取消息。

当然,这是假设您正在使用Microsoft平台。

另一答案

如果不拥有owner = null,则应将其设置为伪无人记录。搜索null不会限制索引,最终可能会进行表扫描。 (这是为oracle,SQL服务器可能会有所不同)

另一答案

您正在尝试实现de“Database as IPC”反模式。查一下,了解为什么要考虑重新设计软件。

以上是关于将数据库表用作作业队列(a.k.a批处理队列或消息队列)的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章

消息队列概念

rabbitmq和redis用作消息队列的区别

rabbitmq和redis用作消息队列的区别

rabbitmq和redis用作消息队列的区别

消息未发送到 RTE 上的回复队列

使用消息队列的理由