多线程文件压缩
Posted
技术标签:
【中文标题】多线程文件压缩【英文标题】:Multithreading file compress 【发布时间】:2015-11-09 05:18:50 【问题描述】:我刚刚开始使用线程, 我想写一个简单的文件压缩器。它应该创建两个后台线程——一个用于读取,另一个用于写入。第一个应该按小块读取文件并将它们放入队列中,其中int - 是chunkId。第二个线程应该将块出列并按顺序(使用chunkId)将它们写入输出流(文件,该线程在开始时创建的文件)。
我做到了。但我不明白为什么在我的程序结束并打开我的 gzip 文件后 - 我看到,我的块混合了,并且文件没有以前的顺序。
public static class Reader
private static readonly object Locker = new object();
private const int ChunkSize = 1024*1024;
private static readonly int MaxThreads;
private static readonly Queue<KeyValuePair<int, byte[]>> ChunksQueue;
private static int _chunksComplete;
static Reader()
MaxThreads = Environment.ProcessorCount;
ChunksQueue = new Queue<KeyValuePair<int,byte[]>>(MaxThreads);
public static void Read(string filename)
_chunksComplete = 0;
var tRead = new Thread(Reading) IsBackground = true ;
var tWrite = new Thread(Writing) IsBackground = true ;
tRead.Start(filename);
tWrite.Start(filename);
tRead.Join();
tWrite.Join();
Console.WriteLine("Finished");
private static void Writing(object threadContext)
var filename = (string) threadContext;
using (var s = File.Create(filename + ".gz"))
while (true)
var dataPair = DequeueSafe();
if (dataPair.Value == null)
return;
while (dataPair.Key != _chunksComplete)
Thread.Sleep(1);
Console.WriteLine("write chunk 0", dataPair.Key);
using (var gz = new GZipStream(s, CompressionMode.Compress, true))
gz.Write(dataPair.Value, 0, dataPair.Value.Length);
_chunksComplete++;
private static void Reading(object threadContext)
var filename = (string) threadContext;
using (var s = File.OpenRead(filename))
var counter = 0;
var buffer = new byte[ChunkSize];
while (s.Read(buffer, 0, buffer.Length) != 0)
while (ChunksQueue.Count == MaxThreads)
Thread.Sleep(1);
Console.WriteLine("read chunk 0", counter);
var dataPair = new KeyValuePair<int, byte[]>(counter, buffer);
EnqueueSafe(dataPair);
counter++;
EnqueueSafe(new KeyValuePair<int, byte[]>(0, null));
private static void EnqueueSafe(KeyValuePair<int, byte[]> dataPair)
lock (ChunksQueue)
ChunksQueue.Enqueue(dataPair);
private static KeyValuePair<int, byte[]> DequeueSafe()
while (true)
lock (ChunksQueue)
if (ChunksQueue.Count > 0)
return ChunksQueue.Dequeue();
Thread.Sleep(1);
更新: 我只能使用 .NET 3.5
【问题讨论】:
您应该切换到BlockingCollection
,并将ConcurrentQueue
作为基础集合。您不再需要锁定,也不再需要 Thread.Sleep(1)
,因为如果没有可用数据,它将等待数据到达。
我忘了说我只能使用 .net 3.5!
为什么不简单地读取一个缓冲区,然后在压缩+写入该缓冲区时读取下一个?只需要一个线程,您使用异步 I/O,如果 compress+write 线程比读取线程花费更多时间来完成其工作,则不要冒险用您已读取的缓冲区填充队列。
这只是一种练习。这个例子与现实生活中的例子没有任何联系。但是谢谢你的想法!
【参考方案1】:
虽然alexm's answer 确实提出了一个非常重要的观点,即Stream.Read
填充buffer
的字节数可能少于您请求的字节数,但您遇到的主要问题是您只有一个byte[]
您一直在使用一遍又一遍。
当您的读取循环读取第二个值时,它会覆盖位于您传递给队列的dataPair
内的byte[]
。您必须有一个 buffer = new byte[ChunkSize];
inside 循环来解决这个问题。您还必须记录读取的字节数,并且只写入相同的字节数。
您不需要在对中保留counter
,因为Queue
将保持顺序,使用对中的int
来存储记录的字节数,如alexm 的示例中所示。
【讨论】:
好的,我明白了。我已经像@alexm 一样编辑了我的代码,你解释了,现在我按顺序写入所有字节。正如你所说,我已经更换了柜台,现在没有必要了。但我有最后一个问题 - 为什么 alexm 告诉我编辑 Writer 并将其作为第三个参数 - 值键(而不是值长度......)? 我认为这是错误的。第三个参数是写入缓冲区的长度。 @matterai 因为缓冲区在它的末尾保存垃圾数据,所以它不是 100% 充满了你想要的数据。在 alexm 的回答中,他在密钥中保存了您要读取的“好字节”的长度,在编写器中,您只需要在其中写入“好字节”并在最后排除垃圾字节。【参考方案2】:Stream.Read()
返回它消耗的实际字节数。使用它来限制 writer 的块大小。而且,由于涉及并发读取和写入,您将需要多个缓冲区。
尝试 4096 作为块大小。
读者:
var buffer = new byte[ChunkSize];
int bytesRead = s.Read(buffer, 0, buffer.Length);
while (bytesRead != 0)
...
var dataPair = new KeyValuePair<int, byte[]>(bytesRead, buffer);
buffer = new byte[ChunkSize];
bytesRead = s.Read(buffer, 0, buffer.Length);
作者:
gz.Write(dataPair.Value, 0, dataPair.Key)
PS:性能可以通过添加一个空闲数据缓冲区池而不是每次都分配新缓冲区并使用事件(例如ManualResetEvent
)来表示队列为空、队列来提高性能已满 而不是使用Thread.Sleep()
。
【讨论】:
Stream.Read
的一个好技巧我学会了使用 for (int bytesRead = s.Read(buffer, 0, buffer.Length); bytesRead != 0; bytesRead = s.Read(buffer, 0, buffer.Length)) ...
而不是 while 循环来合并两个读取调用。
@alexm 所以......我的块是混合的,因为我有并发读写?我说的对吗?
@matterai 有点混在一起,因为你覆盖了传入队列的byte[]
,如果你没有并发读写,你可以逃脱它,但是因为你在有 1 次写入之前可以进行两次读取,您必须在每个循环中创建一个新的 byte[]
。但是,您也确实有第二个错误,即不尊重所读取的字节长度,这是 alexm 在他的回答中所关注的。以上是关于多线程文件压缩的主要内容,如果未能解决你的问题,请参考以下文章
快如闪电:Linux多线程压缩软件pigz和压缩神器zstd