有没有办法将任务并行库(TPL)与 SQLDataReader 一起使用?
Posted
技术标签:
【中文标题】有没有办法将任务并行库(TPL)与 SQLDataReader 一起使用?【英文标题】:Is there a way to use the Task Parallel Library(TPL) with SQLDataReader? 【发布时间】:2011-03-07 00:28:36 【问题描述】:我喜欢 TPL 中 Parallel.For 和 Parallel.ForEach 扩展方法的简单性。我想知道是否有办法利用类似的东西,甚至是稍微高级一点的任务。
下面是 SqlDataReader 的典型用法,我想知道这是否可行,如果可行,如何用 TPL 中的内容替换下面的 while 循环。因为读者不能提供固定数量的迭代,所以 For 扩展方法是不可能的,这让我处理我收集的任务。我希望有人可能已经解决了这个问题,并通过 ADO.net 制定了一些注意事项。
using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
conn.Open();
SqlDataReader reader = comm.ExecuteReader();
if (reader.HasRows)
while (reader.Read())
// Do something with Reader
【问题讨论】:
【参考方案1】:您将很难直接替换该 while 循环。 SqlDataReader 不是 一个线程安全的类,所以你不能直接从多个线程中使用它。
话虽如此,您可能处理您使用 TPL 读取的数据。这里有几个选项。最简单的方法可能是制作您自己的 IEnumerable<T>
实现,该实现适用于阅读器,并返回包含您的数据的类或结构。然后,您可以使用 PLINQ 或 Parallel.ForEach
语句并行处理您的数据:
public IEnumerable<MyDataClass> ReadData()
using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
conn.Open();
SqlDataReader reader = comm.ExecuteReader();
if (reader.HasRows)
while (reader.Read())
yield return new MyDataClass(... data from reader ...);
一旦你有了那个方法,你就可以通过 PLINQ 或 TPL 直接处理它:
Parallel.ForEach(this.ReadData(), data =>
// Use the data here...
);
或者:
this.ReadData().AsParallel().ForAll(data =>
// Use the data here...
);
【讨论】:
【参考方案2】:你快到了。使用此签名将您发布在函数中的代码包装起来:
IEnumerable<IDataRecord> MyQuery()
然后将您的 // Do something with Reader
代码替换为:
yield return reader;
现在你有了可以在单线程中工作的东西。不幸的是,当您阅读查询结果时,它每次都会返回对 same 对象的引用,并且该对象只会在每次迭代时自行改变。这意味着如果你尝试并行运行它,你会得到一些非常奇怪的结果,因为并行读取会改变不同线程中使用的对象。您需要代码来获取记录的副本,以发送到您的并行循环。
不过,在这一点上,我喜欢做的是跳过记录的额外副本,直接进入强类型类。不仅如此,我更喜欢使用通用方法来做到这一点:
IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
using (var cn = new SqlConnection("My connection string"))
using (var cmd = new SqlCommand(sql, cn))
addParameters(cmd.Parameters);
cn.Open();
using (var rdr = cmd.ExecuteReader())
while (rdr.Read())
yield return factory(rdr);
假设您的工厂方法按预期创建副本,则此代码在 Parallel.ForEach 循环中使用应该是安全的。调用该方法看起来像这样(假设一个 Employee 类具有一个名为“Create”的静态工厂方法):
var UnderPaid = GetData<Employee>(Employee.Create,
"SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary",
p =>
p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
);
Parallel.ForEach(UnderPaid, e => e.GiveRaise());
重要更新: 我对这段代码没有以前那么自信了。当另一个线程正在复制它时,一个单独的线程仍然可以改变阅读器。我可以锁定它,但我也担心另一个线程可能会在原件本身调用 Read() 之后但在它开始制作副本之前调用更新阅读器。因此,这里的关键部分由整个 while 循环组成……此时,您又回到了单线程。我希望有一种方法可以修改此代码,使其在多线程场景中按预期工作,但需要更多研究。
【讨论】:
你所说的大部分内容我都支持你,你在工厂里失去了我一点。 Func以上是关于有没有办法将任务并行库(TPL)与 SQLDataReader 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章