有没有办法将任务并行库(TPL)与 SQLDataReader 一起使用?

Posted

技术标签:

【中文标题】有没有办法将任务并行库(TPL)与 SQLDataReader 一起使用?【英文标题】:Is there a way to use the Task Parallel Library(TPL) with SQLDataReader? 【发布时间】:2011-03-07 00:28:36 【问题描述】:

我喜欢 TPL 中 Parallel.For 和 Parallel.ForEach 扩展方法的简单性。我想知道是否有办法利用类似的东西,甚至是稍微高级一点的任务。

下面是 SqlDataReader 的典型用法,我想知道这是否可行,如果可行,如何用 TPL 中的内容替换下面的 while 循环。因为读者不能提供固定数量的迭代,所以 For 扩展方法是不可能的,这让我处理我收集的任务。我希望有人可能已经解决了这个问题,并通过 ADO.net 制定了一些注意事项。

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))

    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    
        while (reader.Read())
        
            // Do something with Reader
        
    

【问题讨论】:

【参考方案1】:

您将很难直接替换该 while 循环。 SqlDataReader 不是 一个线程安全的类,所以你不能直接从多个线程中使用它。

话虽如此,您可能处理您使用 TPL 读取的数据。这里有几个选项。最简单的方法可能是制作您自己的 IEnumerable<T> 实现,该实现适用于阅读器,并返回包含您的数据的类或结构。然后,您可以使用 PLINQ 或 Parallel.ForEach 语句并行处理您的数据:

public IEnumerable<MyDataClass> ReadData()

    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        
            while (reader.Read())
            
                yield return new MyDataClass(... data from reader ...);
            
        
    

一旦你有了那个方法,你就可以通过 PLINQ 或 TPL 直接处理它:

Parallel.ForEach(this.ReadData(), data =>

    // Use the data here...
);

或者:

this.ReadData().AsParallel().ForAll(data => 

    // Use the data here...
);

【讨论】:

【参考方案2】:

你快到了。使用此签名将您发布在函数中的代码包装起来:

IEnumerable<IDataRecord> MyQuery()

然后将您的 // Do something with Reader 代码替换为:

yield return reader;

现在你有了可以在单线程中工作的东西。不幸的是,当您阅读查询结果时,它每次都会返回对 same 对象的引用,并且该对象只会在每次迭代时自行改变。这意味着如果你尝试并行运行它,你会得到一些非常奇怪的结果,因为并行读取会改变不同线程中使用的对象。您需要代码来获取记录的副本,以发送到您的并行循环。

不过,在这一点上,我喜欢做的是跳过记录的额外副本,直接进入强类型类。不仅如此,我更喜欢使用通用方法来做到这一点:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)

    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        
            while (rdr.Read())
            
                yield return factory(rdr);
            
        
    

假设您的工厂方法按预期创建副本,则此代码在 Parallel.ForEach 循环中使用应该是安全的。调用该方法看起来像这样(假设一个 Employee 类具有一个名为“Create”的静态工厂方法):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => 
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       );
Parallel.ForEach(UnderPaid, e => e.GiveRaise());

重要更新: 我对这段代码没有以前那么自信了。当另一个线程正在复制它时,一个单独的线程仍然可以改变阅读器。我可以锁定它,但我也担心另一个线程可能会在原件本身调用 Read() 之后但在它开始制作副本之前调用更新阅读器。因此,这里的关键部分由整个 while 循环组成……此时,您又回到了单线程。我希望有一种方法可以修改此代码,使其在多线程场景中按预期工作,但需要更多研究。

【讨论】:

你所说的大部分内容我都支持你,你在工厂里失去了我一点。 Func 工厂与 yeild return factor(rdr) 一起使用时与调用不匹配我认为您的意思是 Func。所以不确定你所说的按预期复制是什么意思。你的意思是基本上从读者那里阅读并返回一个类似于 Reed 在他的回复中所说的 MyDataClass? 看起来你的 GetData 调用是我们的命令,你在 sql 字符串之前有工厂函数。不管我想我明白了,你的 Employee.Create 是你的工厂,它与读者一起完成所需的工作。我会玩一会儿,看看效果如何。 是的,我的意思是 Func。将解决这个问题和参数不匹配。 效果很好。感谢您的帮助,我喜欢 GetData 方法,无论是否使用线程,它都能很好地工作。 这里有一些不错的功能代码,但至于并行运行的好处,我不确定是否有。瓶颈可能是实际的 db 调用,它不是并行运行的。

以上是关于有没有办法将任务并行库(TPL)与 SQLDataReader 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

三分钟总览微软任务并行库TPL

是否可以在 C# 2.0 中使用任务并行库 (TPL)?

理解并行编程

线程可以做啥,而基于任务的异步模式(TAP)和任务并行(TPL)与任务(或任务<T>)不能做啥?

TPL异步并行编程之任务超时

使用任务并行库的更好方法是啥