使用队列在两个 BackgroundWorker 之间传递数据

Posted

技术标签:

【中文标题】使用队列在两个 BackgroundWorker 之间传递数据【英文标题】:Using a Queue to pass data between two BackgroundWorkers 【发布时间】:2019-10-04 14:29:36 【问题描述】:

我正在逐字节读取和解码二进制文件。为此,我使用了两个 BackgroundWorkers :一个用于读取文件,它为我的文件的每个“行”生成一个可变大小的 List<byte>,另一个用于处理“行”。

由于我希望它们并行运行,而我不知道哪个会比另一个更快,所以我使用Queue 在两个BackgroundWorkers 之间传递数据。

事情是这样的:任何时候List<byte> 都不应该包含任何0 值。我检查之前将它们添加到队列中。尽管如此,在Queue 的另一端,一些列表包含0 值。然而,我在每次调用Dequeue() 时创建一个新的List<byte>,因为显然,如果我不这样做,数据会在处理完成之前被修改。

我尝试手动创建一个新的List<byte> 对象并然后为其分配Dequeue() 的结果,但没有改进。这是我第一次使用Queue,由于我的代码是多线程的,因此几乎不可能逐步调试。

Queue<List<byte>> q = new Queue<List<byte>>(); // My FIFO queue

// Reading thread
private void BackgroudWorkerRead_DoWork(object sender, DoWorkEventArgs e)

      // ... read the file
      List<byte> line_list = new List<byte>();
      // ... filling line_list with data
      // in this part I check that no byte added to line_list has the value 0, or else I display an errror message and end the process
      q.Enqueue(line_list);
      if (!backgroundWorkerNewLine.IsBusy) backgroundWorkerNewLine.RunWorkerAsync(); // if the other BackgroundWorker isn't processing data, now it needs to since we just added some to the queue


// Processing thread
private void backgroundWorkerNewLine_DoWork(object sender, DoWorkEventArgs e)

    while (q.Count > 0) // While there is data to process
    
          string line_str = DecodeBytes(new List<byte>(q.Dequeue())); // Decoding
          string[] elements = line_str.Split(separator, StringSplitOptions.None); // Separating values

          Form1.ActiveForm.Invoke(new MethodInvoker(() => AddRow(elements))); // Add the line to a DataTable from the main thread
    


public string DecodeBytes(List<byte> line)

 /// ... read each byte and return a string of the whole decoded line


public void AddRow(string[] el)

    MyDataTable.Rows.Add(el);

q.Dequeue() 返回的 List 似乎与 q.Enqueue() 添加的数据不同

【问题讨论】:

在两个不同的线程上同时读写会让你很痛苦。 使用ConcurrentQueue 【参考方案1】:

您应该使用 Microsoft 的反应式框架(又名 Rx)- NuGet System.Reactive.Windows.Forms(假设您正在编写 WinForms 应用程序)并添加 using System.Reactive.Linq;

Rx 让您使用熟悉的 LINQ 语法来处理并行操作。

您尚未向我们展示如何将文件分解为 List&lt;byte&gt; 的列表,因此我假设您有一个类似于 IObservable&lt;List&lt;byte&gt;&gt; DeconstructFile(FileInfo fileInfo) 的方法。

现在你可以这样做了:

IObservable<string[]> query =
    from bytes in DeconstructFile(new FileInfo("myFile.bin"))
    from line_str in Observable.Start(() => DecodeBytes(bytes))
    select line_str.Split(separator, StringSplitOptions.None);

IDisposable subscription =
    query
        .ObserveOn(Form1.ActiveForm)
        .Subscribe(el => MyDataTable.Rows.Add(el));

就是这样。它并行运行,Observable.Start 根据需要启动新线程,并自动将结果传递给每个步骤。 .ObserveOn(Form1.ActiveForm) 自动将 .Subscribe 编组到 UI 线程。

如果您需要在代码完成之前停止代码,只需致电subscription.Dispose()。很简单。

【讨论】:

【参考方案2】:

当您创建多线程应用程序时,您必须非常小心,以防止不同的线程同时访问共享资源。如果你不阻止它,坏事就会发生。您正在丢失更新,您的数据结构已损坏,所有这些都在不可预测且不一致的情况下发生。为了避免这些问题,您应该同步从不同线程对共享资源的所有访问。这可以通过使用lock 语句来实现。所以建议是:在阅读更新共享资源时始终锁定。您的共享资源是Queue。你应该像这样锁定:

// Reading thread
lock (q)

    q.Enqueue(line_list);


// Processing thread
while (true)

    List<byte> list;
    lock (q)
    
        if (q.Count == 0) break;
        list = new List<byte>(q.Dequeue());
    
    string line_str = DecodeBytes(list); // Decoding
    // ...

锁定的缺点是它会产生争用,因此您不应锁定超过绝对必要的次数。尤其要避免在持有锁时进行繁重的计算。

除此之外,您尝试实现的模式是生产者-消费者模式,而 .NET 提供了一个专门的类来促进这种模式。它是BlockingCollection 类,它为您处理所有这些混乱的线程同步。它可以帮助您减少必须编写的代码,但代价是学习曲线很小。 AddCompleteAddingGetConsumingEnumerable这些方法你基本都需要学习了,就可以开始了。

【讨论】:

以上是关于使用队列在两个 BackgroundWorker 之间传递数据的主要内容,如果未能解决你的问题,请参考以下文章

两个同时运行的线程控件(BackgroundWorker)串数据

(C#) BackgroundWorker() ProgressChanged 不工作

WP7 为两个 BackgroundWorker 对象触发一个 RunWorkerCompleted 事件

C#timer backgroundworker用户控件

怎么在backgroundWorker执行时传递数据

如何将 BackgroundWorker 的结果设置为 TextView