如何避免从 LINQ to Entities 中的多个线程从 SQL 数据库中读取相同的记录

Posted

技术标签:

【中文标题】如何避免从 LINQ to Entities 中的多个线程从 SQL 数据库中读取相同的记录【英文标题】:How to avoid reading same records from SQL Database from multiple threads in LINQ to Entities 【发布时间】:2021-11-22 23:24:16 【问题描述】:

我们正在 Windows 服务的多线程环境中处理单个文件,其中每 10 秒一个新线程将开始处理未处理的文件。

await Task.Run(()=> ProcessFilesFromDatabase());

有时我们会遇到两个线程读取相同文件的问题,并且我们会遇到重复文件被处理的问题。当我们在线检查在这种情况下如何在 LINQ to Entities 中克服时,我们被建议使用transaction scope and readcommitted

using (var context = new Context())
        
            var transActionOption1 = new TransactionOptions();
            transActionOption1.IsolationLevel = System.Transactions.IsolationLevel.ReadCommitted;
            using (TransactionScope tran = new TransactionScope(TransactionScopeOption.Required, transActionOption1))
            
            

                try
                
                       var fileInfo = context.File.Where(a=>!a.IsRead).ToList();
                       foreach (FileDetails fileDetail in fileInfo) 
                       
                           fileDetail.IsRead = true;
                           context.Entry(fileDetail).State = EntityState.Modified;
                           context.savechanges();
                       
                      
                       tran.Complete();
                
               //Process each file after updating read
            
          

但线程 1 仍然读取 5 条记录,并且在使用 IsRead - true 标志更新回数据库之前,线程 2 读取相同的 5 条记录。

我们相信,在 ReadCommitted 范围内,任何未提交的记录都不会被其他线程读取。

谢谢,

塞尔瓦

【问题讨论】:

为什么要每 10 秒启动一个新线程呢?也许只使用一个线程并每 10 秒检查一次以防止重叠? 是的,如果线程 1 处理文件 1 分钟同时下一组文件到达意味着我们需要等待 60 秒来处理下一组文件。如果间隔为 10 秒,我可以新启动新线程来启动新文件或未处理的文件 那么最好让一个线程每 10 秒检查一次新文件,并将要处理的内容放入队列中(在内存中,如 ConcurrentQueue)。然后一个或多个其他线程从这个队列中拉出项目并处理它们。这将比尝试同时处理数据库中的同一组项目更容易和可靠。 几乎所有的代码要么是不需要的,要么是有害的。当您调用SaveChanges 时,DbContext 会缓存更改并将所有更改保存在单个事务中。在每次修改后调用 SaveChanges 几乎总是一个错误。如果你不这样做,你就不需要 TransactionScope。至于轮询更改 - 不要那样做。有更好的方法,从 not 使用数据库进行通知开始。并且不使用 ORM 来加载更改 对于初学者,使用一个单个线程来轮询数据库并让它为工作人员提供数据,例如,通过一个Channel。 SQL Server 在所有版本和版本中都支持Change Tracking,这使您可以廉价地读取上次检查后修改的任何记录。 【参考方案1】:

这种设计有几个问题:

EF 和任何 ORM 都不适合此类查询。为每一行调用SaveChanges会增加发生冲突的机会。 轮询在同一个线程上处理数据几乎可以保证从一个作业执行到下一个作业执行会发生冲突

获取更改

在 SQL Server 中,可以从使用 OUTPUT 子句修改它们的查询中返回修改后的数据,例如:

UPDATE Files
Set IsRead=1
OUTPUT inserted.*
where IsRead=0;

在 EF Core 中,可以使用FromSqlRaw 将结果映射到实体,例如:

List<File> GetNewFiles()

    var sql=@"UPDATE Files
Set IsRead=1
OUTPUT inserted.*
where IsRead=0;";
    using(var context=new MyContext())
    
        var newFiles=context.Files.FromSqlRaw(sql)...ToList();
        return newFiles;
    

EF 仅用于在此处加载数据。使用像 Dapper 这样的 microORM 来执行查询并直接映射结果可能更简单

List<File> GetNewFiles(string connectionString)

    var sql=@"UPDATE Files
Set IsRead=1
OUTPUT inserted.*
where IsRead=0;";
    using(var connection=new SqlConnection(connectionString))
    
        var newFiles=connection.Query<File>(sql).ToList();
        return newFiles;
    

将轮询与处理分开

为了完全避免冲突,轮询和处理步骤应该分开。一种方法是将投票结果发布到例如 ActionBlock 或 Channel 并让工作人员处理它们。

假设处理方法是ProcessFiles(List&lt;File&gt;)。 ActionBlock 可用于处理使用该方法发布到它的任何文件。

var block=new ActionBlock<List<File>>(files=>ProcessFiles(files));

计时器可用于轮询数据库并将任何新文件发布到块

var timer=new Timer(()=>
    var files=GetNewFiles(connectionString);
    block.Post(files)
);

timer.Change(0,10000);

这足以开始轮询数据库并处理不同数据库中的文件。

要停止处理,首先我们停止计时器,然后我们告诉块停止并等待它完成:

timer.Dispose();
block.Complete();
await block.Completion;

【讨论】:

以上是关于如何避免从 LINQ to Entities 中的多个线程从 SQL 数据库中读取相同的记录的主要内容,如果未能解决你的问题,请参考以下文章

如何修复 ApplicationUserManager 中的“LINQ to Entities 中不支持指定的类型成员‘UserId’”

将日期时间转换为 LINQ-to-entities 查询中的格式化字符串

Linq-to-Entities:带有 WHERE 子句和投影的 LEFT OUTER JOIN

使用 LINQ to Entities 的内部查询(子查询)返回列表中的 IEnumerable 列表

这是如何运作的? LINQ to Entities 触发程序集的加载?

Linq to Entities - SQL“IN”子句