使用队列在两个 BackgroundWorker 之间传递数据
Posted
技术标签:
【中文标题】使用队列在两个 BackgroundWorker 之间传递数据【英文标题】:Using a Queue to pass data between two BackgroundWorkers 【发布时间】:2019-10-04 14:29:36 【问题描述】:我正在逐字节读取和解码二进制文件。为此,我使用了两个 BackgroundWorker
s :一个用于读取文件,它为我的文件的每个“行”生成一个可变大小的 List<byte>
,另一个用于处理“行”。
由于我希望它们并行运行,而我不知道哪个会比另一个更快,所以我使用Queue
在两个BackgroundWorker
s 之间传递数据。
事情是这样的:任何时候List<byte>
都不应该包含任何0
值。我检查之前将它们添加到队列中。尽管如此,在Queue
的另一端,一些列表包含0
值。然而,我在每次调用Dequeue()
时创建一个新的List<byte>
,因为显然,如果我不这样做,数据会在处理完成之前被修改。
我尝试手动创建一个新的List<byte>
对象并然后为其分配Dequeue()
的结果,但没有改进。这是我第一次使用Queue
,由于我的代码是多线程的,因此几乎不可能逐步调试。
Queue<List<byte>> q = new Queue<List<byte>>(); // My FIFO queue
// Reading thread
private void BackgroudWorkerRead_DoWork(object sender, DoWorkEventArgs e)
// ... read the file
List<byte> line_list = new List<byte>();
// ... filling line_list with data
// in this part I check that no byte added to line_list has the value 0, or else I display an errror message and end the process
q.Enqueue(line_list);
if (!backgroundWorkerNewLine.IsBusy) backgroundWorkerNewLine.RunWorkerAsync(); // if the other BackgroundWorker isn't processing data, now it needs to since we just added some to the queue
// Processing thread
private void backgroundWorkerNewLine_DoWork(object sender, DoWorkEventArgs e)
while (q.Count > 0) // While there is data to process
string line_str = DecodeBytes(new List<byte>(q.Dequeue())); // Decoding
string[] elements = line_str.Split(separator, StringSplitOptions.None); // Separating values
Form1.ActiveForm.Invoke(new MethodInvoker(() => AddRow(elements))); // Add the line to a DataTable from the main thread
public string DecodeBytes(List<byte> line)
/// ... read each byte and return a string of the whole decoded line
public void AddRow(string[] el)
MyDataTable.Rows.Add(el);
q.Dequeue() 返回的 List 似乎与 q.Enqueue() 添加的数据不同
【问题讨论】:
在两个不同的线程上同时读写会让你很痛苦。 使用ConcurrentQueue
【参考方案1】:
您应该使用 Microsoft 的反应式框架(又名 Rx)- NuGet System.Reactive.Windows.Forms
(假设您正在编写 WinForms 应用程序)并添加 using System.Reactive.Linq;
。
Rx 让您使用熟悉的 LINQ 语法来处理并行操作。
您尚未向我们展示如何将文件分解为 List<byte>
的列表,因此我假设您有一个类似于 IObservable<List<byte>> DeconstructFile(FileInfo fileInfo)
的方法。
现在你可以这样做了:
IObservable<string[]> query =
from bytes in DeconstructFile(new FileInfo("myFile.bin"))
from line_str in Observable.Start(() => DecodeBytes(bytes))
select line_str.Split(separator, StringSplitOptions.None);
IDisposable subscription =
query
.ObserveOn(Form1.ActiveForm)
.Subscribe(el => MyDataTable.Rows.Add(el));
就是这样。它并行运行,Observable.Start
根据需要启动新线程,并自动将结果传递给每个步骤。 .ObserveOn(Form1.ActiveForm)
自动将 .Subscribe
编组到 UI 线程。
如果您需要在代码完成之前停止代码,只需致电subscription.Dispose()
。很简单。
【讨论】:
【参考方案2】:当您创建多线程应用程序时,您必须非常小心,以防止不同的线程同时访问共享资源。如果你不阻止它,坏事就会发生。您正在丢失更新,您的数据结构已损坏,所有这些都在不可预测且不一致的情况下发生。为了避免这些问题,您应该同步从不同线程对共享资源的所有访问。这可以通过使用lock
语句来实现。所以建议是:在阅读和更新共享资源时始终锁定。您的共享资源是Queue
。你应该像这样锁定:
// Reading thread
lock (q)
q.Enqueue(line_list);
// Processing thread
while (true)
List<byte> list;
lock (q)
if (q.Count == 0) break;
list = new List<byte>(q.Dequeue());
string line_str = DecodeBytes(list); // Decoding
// ...
锁定的缺点是它会产生争用,因此您不应锁定超过绝对必要的次数。尤其要避免在持有锁时进行繁重的计算。
除此之外,您尝试实现的模式是生产者-消费者模式,而 .NET 提供了一个专门的类来促进这种模式。它是BlockingCollection
类,它为您处理所有这些混乱的线程同步。它可以帮助您减少必须编写的代码,但代价是学习曲线很小。 Add
、CompleteAdding
、GetConsumingEnumerable
这些方法你基本都需要学习了,就可以开始了。
【讨论】:
以上是关于使用队列在两个 BackgroundWorker 之间传递数据的主要内容,如果未能解决你的问题,请参考以下文章
两个同时运行的线程控件(BackgroundWorker)串数据
(C#) BackgroundWorker() ProgressChanged 不工作