使用多个 Datareader 连接进行批处理
Posted
技术标签:
【中文标题】使用多个 Datareader 连接进行批处理【英文标题】:Batch processing using multiple Datareader connections 【发布时间】:2021-02-03 19:51:23 【问题描述】:我有一个包含超过 100000 条记录的表。我想读取数据,处理它们,然后如果处理成功则删除该行,如果失败则使用错误代码更新该行。
我的方法是一次获取 1000 条记录,将其加载到数据表中,然后将其传递给在 5 个并行线程中运行进程的函数。处理完前 1000 条记录后,打开一个新的 DataReader 并处理接下来的 1000 条......等等......
我的方法看起来正确吗?我遇到的问题是代码运行良好,但 1000 条记录没有完全处理。处理了大约 300 条记录,其余的仍然存在。我在这里做错了什么?
using (SqlDataReader rdr = cmd.ExecuteReader)
if (rdr.HasRows)
Datatable dt = new Datatable();
dt.Load(rdr);
//process data
results = Process(dt);
public List<string> Process(Datatable dt)
var options = new ParallelOptions()
MaxDegreeOfParallelism = 5
List<string> results = new List<string>();
Parallel.ForEach(dt.Rows.Cast<DataRow>(), options, (trans) =>
//process and then delete/update row
....
results.Add(transResult);
);
return results
【问题讨论】:
从多个线程同时在results
上调用Add
是不安全的。在并行作业完成后使用ConcurrentBag
并调用ToList()
,或者使用lock
语句包装Add
这种方法是错误的,有几个原因。 Parallel.ForEach
仅用于使用所有可用内核处理大量内存数据,不用于并行运行异步 IO 操作。无论process and then delete/update row
做什么,在某些时候它都会与数据库通信,一次慢行,在等待数据库响应时阻塞 CPU 内核
您执行什么样的处理?这很重要。您可以使用 Dataflow 类(如 TransformBlock、ActionBlock 和 BatchBlock)来读取流中的源数据,处理每一行并批处理结果,然后再将它们批量写入数据库,避免 单个行修改所需的网络往返
你在做什么处理?这一切都可以在 SQL 中完成吗? @PanagiotisKanavos lambda 中没有完成 IO,它正在内存中工作 DataTable
//process and then delete/update row
DataRow 所经历的处理类型的信息?是I/O-bound or CPU-bound吗?
【参考方案1】:
线程共享相同的资源,因此会导致 dbContext 出现问题。所以最好在 Parallel.ForEach 中做锁
Parallel.ForEach(dt.Rows.Cast<DataRow>(), options, (trans) =>
var obj=new object();
lock(obj)
//...your processing
);
【讨论】:
锁定对象的创建必须脱离循环。那么它应该可以工作,但它可能会对性能产生很大影响。以上是关于使用多个 Datareader 连接进行批处理的主要内容,如果未能解决你的问题,请参考以下文章
MysqlException 未处理 DataReader 必须关闭此连接 vb.net
DataReader + MySql 连接器 + Dispose