我可以将 Parallel.For 与 sql 命令一起使用吗?

Posted

技术标签:

【中文标题】我可以将 Parallel.For 与 sql 命令一起使用吗?【英文标题】:Can I use Parallel.For with sql Commands? 【发布时间】:2017-06-13 14:38:25 【问题描述】:

我有一个范围类

public class avl_range

    public long start  get; set; 
    public long end  get; set; 

如果我使用普通的FOR 效果很好,但必须等待每个命令完成并且每个查询需要 8 秒,所以 10 个查询需要 80 秒。

在并行版本中,如果我只打印范围可以完美,但如果尝试执行命令说已经在进行中。

“一个操作已经在进行中。”

我该如何解决这个问题?

var numbers = new List<avl_range>();
using (var conn = new NpgsqlConnection(strConnection))
    
        conn.Open();

        Action<avl_range> forEachLoop = number => //Begin definition of forLoop
        
             // only the console write line works ok
            Console.WriteLine(number.start + " - " + number.end);

            using (var cmd = new NpgsqlCommand())
            
                cmd.Connection = conn;                            
                cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link(0, 1);"
                                                 , number.start
                                                 , number.end);
                // here cause the error.
                using (var reader = cmd.ExecuteReader())
                
                    while (reader.Read())
                    
                        Console.WriteLine(reader.GetString(0));
                    
                
            
        ;

        Parallel.ForEach(numbers, forEachLoop);
    
 );

仅供参考:我正在尝试解决此问题,我将其发布before

【问题讨论】:

在微软的 SQL server 中,这会导致错误,因为在同一个连接上不能有多个打开的记录集(不启用 MARS)。我不知道您的 SQL 提供程序是否以相同的方式实现,但我猜很有可能。 在并行 foreach 中创建一个新的NpgsqlConnection 怎么样? @BradleyUffner 你能提供一些文件吗?我可以尝试,但对我来说似乎没有逻辑为每个查询创建不同的连接。 这适用于微软的 SQL 实现。我不能肯定地说 postressql,因为我从未使用过它,但值得一试。只要它汇集连接,它就不应该有太多的性能损失。 这个***.com/questions/8803733/… 似乎表明prostgresql 没有MARS 等效功能。 【参考方案1】:

不能同时使用 Npgsql 连接 - 在任何给定时间点只能运行一个命令(换句话说,不支持 MARS)。

打开多个连接以并行执行查询绝对是有意义的。尽管建立一个新的物理连接很昂贵,但连接池非常轻量级,因此重用物理连接的开销很小。不这样做的主要原因是如果您需要多个操作在同一个事务中。

【讨论】:

你确定 Npgsql 实现了连接池吗?是否支持它取决于 ADO.NET 提供者。老实说,我不知道他们是否知道答案。 是的,我就是写这篇文章的人 :) 哦,那好吧:) 嗨,Shay,我在 Npgsql 论坛中 dropyghost :)。我只是将连接创建移动到forEachLoop 中,现在正在工作。没关系,还是我应该做点别的,因为你提到重用连接? @JuanCarlosOropeza 如果您的所有代码都写入控制台,这可能是减慢您速度的部分。控制台是最慢的。使用没有并行的普通 foreach 循环再次尝试,但将字符串存储在列表中,而不是将它们写入控制台并注释掉 Console.WriteLine(number.start + " - " + number.end);,然后让我们知道您的时间。【参考方案2】:

即使你可以让它与 MARS 一起工作,连接对象也几乎从来都不是线程安全的,你需要每个线程都有一个连接。 Parallel.ForEach has overloads to make this easy 具有在线程开始和结束时运行的函数。

var numbers = new List<avl_range>();

Func<NpgsqlConnection> localInit => () => 

    var conn = new NpgsqlConnection(strConnection);
    conn.Open();
;

Action<NpgsqlConnection> localFinally = (conn) => conn.Dispose();

Func<avl_range, ParallelLoopState, NpgsqlConnection, NpgsqlConnection> forEachLoop = (number, loopState, conn) => //Begin definition of forLoop

     // only the console write line works ok
    Console.WriteLine(number.start + " - " + number.end);

    using (var cmd = new NpgsqlCommand())
    
        cmd.Connection = conn;                            
        cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link(0, 1);"
                                         , number.start
                                         , number.end);
        // here cause the error.
        using (var reader = cmd.ExecuteReader())
        
            while (reader.Read())
            
                Console.WriteLine(reader.GetString(0));
            
        
    
    return conn;
;

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

话虽如此,在大多数情况下,与数据库进行并发连接并不是正确的想法,瓶颈可能在其他地方,您应该使用分析器来查看真正减慢程序速度的原因并将您的工作重点放在那里。


cmets 示例代码:

var numbers = GetDataForNumbers();
List<string> results = new List<string>();

Func<List<string>> localInit => () => new List<string>();

Func<avl_range, ParallelLoopState, List<string>, List<string>> forEachLoop = (number, loopState, localList) => //Begin definition of forLoop

    using (var conn = new NpgsqlConnection(strConnection))
    
        conn.Open();

        //This line is going to slow your program down a lot, so i commented it out.
        //Console.WriteLine(number.start + " - " + number.end);

        using (var cmd = new NpgsqlCommand())
        
            cmd.Connection = conn;                            
            cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link(0, 1);"
                                             , number.start
                                             , number.end);
            using (var reader = cmd.ExecuteReader())
            
                while (reader.Read())
                
                    //Add a object to the thread local list, we don't need to lock here because we are the only thread with access to it.
                    localList.Add(reader.GetString(0));
                
            
        
    
    return localList;
;

Action<List<String>> localFinally = localList => 

    //Combine the local list to the main results, we need to lock here as more than one thread could be merging at once.
    lock(results)
    
        results.AddRange(localList);
    
;

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

//results now contains strings from all the threads here.

【讨论】:

谢谢 Scott,这个设置和移动里面的创建连接有什么区别吗? 如果 NpgsqlConnection 不支持连接池,这种方式会更快,因为它会创建更少的总连接(单个线程可能会被重复用于主体的多次迭代)。然而,由于 Shay 确认它确实进行了连接池,我宁愿将它放在 foreach 主体中,因为它使代码更加简单易读,并且具有完全相同的性能。所有这些代码所做的就是通过为每个线程建立一个连接来创建一个类似“池”的新效果,因为提供者有自己的池,我们不需要自己创建。 好的,那么我将保留using ....,但很高兴知道localInit, localFinally 可能在稍后的其他步骤中有用。 一个常见的用例是你在 foreach 之外创建一个像 var combinedList = new List&lt;T&gt; 这样的对象,然后在 localinit 中你也做一个 return new List&lt;T&gt; 并写入循环体中的本地列表。在您的 localFinally 中,您执行 localList =&gt; lock(combinedList) combinedList.AddRange(localList); 以线程安全的方式将单个线程列表合并到主列表中,然后您唯一拥有锁的地方是 localFinally 并且在循环体中没有锁。 是的,这正是我需要的。你能给我推荐一个工作示例代码吗?

以上是关于我可以将 Parallel.For 与 sql 命令一起使用吗?的主要内容,如果未能解决你的问题,请参考以下文章

测量Parallel.For的执行时间

在Parallel.For中,是否可以同步每个线程?

我可以在 Parallel.For 循环中使用相同的函数委托吗

Parallel.For 和 ConcurrentBag 给出不可预测的行为

是否可以在 Parallel.For 中定义执行顺序?

Parallel.for 循环执行