如何在 C++ 中异步读/写?
Posted
技术标签:
【中文标题】如何在 C++ 中异步读/写?【英文标题】:How to asynchronously read/write in C++? 【发布时间】:2015-06-03 11:08:03 【问题描述】:如何使用 C++ 中的专用读/写线程将一个流复制到另一个流?
假设我有这些方法(不是真实的,但为了说明这一点)来读取/写入数据。这些读/写函数可以代表任何东西(网络/文件/USB/串行/等)。
// returns the number of bytes read
void read(char* buffer, int bufferSize, int* bytesRead);
// returns the number of bytes written
void write(char* buffer, int bufferSize, int* bytesWritten);
解决方案也应该是可移植的。
注意:我知道 Windows 有 FILE_FLAG_OVERLAPPED
功能,但这假设读/写是文件 IO。请记住,这些读/写方法可以代表任何东西。
【问题讨论】:
你能澄清一下你的问题吗?您的问题是关于从一个线程读取并写入另一个线程吗?或者从一个单独的线程中读取一个流并从另一个线程写入同一个流? 也看看 boost.asio。 确实需要一些说明,并且可能是一个使用示例。 “野外有图书馆已经在做这样的事情了吗?”就好像是?那提供那些声明?便携性怎么样? 【参考方案1】:这是我想出的解决方案。
标题
#pragma once
#include <stdlib.h>
#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <list>
#include <thread>
#define ASYNC_COPY_READ_WRITE_SUCCESS 0
struct BufferBlock;
struct ReadStream
// read a stream to a buffer.
// return non-zero if error occured
virtual int read(char* buffer, int bufferSize, int* bytesRead) = 0;
;
struct WriteStream
// write a buffer to a stream.
// return non-zero if error occured
virtual int write(char* buffer, int bufferSize, int* bytesWritten) = 0;
;
class BufferBlockManager
public:
BufferBlockManager(int numberOfBlocks, int bufferSize);
~BufferBlockManager();
void enqueueBlockForRead(BufferBlock* block);
void dequeueBlockForRead(BufferBlock** block);
void enqueueBlockForWrite(BufferBlock* block);
void dequeueBlockForWrite(BufferBlock** block);
void resetState();
private:
std::list<BufferBlock*> blocks;
std::queue<BufferBlock*> blocksPendingRead;
std::queue<BufferBlock*> blocksPendingWrite;
std::mutex queueLock;
std::chrono::milliseconds dequeueSleepTime;
;
void AsyncCopyStream(BufferBlockManager* bufferBlockManager, ReadStream* readStream, WriteStream* writeStream, int* readResult, int* writeResult);
CPP
#include "AsyncReadWrite.h"
struct BufferBlock
BufferBlock(int bufferSize) : buffer(NULL)
this->bufferSize = bufferSize;
this->buffer = new char[bufferSize];
this->actualSize = 0;
this->isLastBlock = false;
~BufferBlock()
this->bufferSize = 0;
free(this->buffer);
this->buffer = NULL;
this->actualSize = 0;
char* buffer;
int bufferSize;
int actualSize;
bool isLastBlock;
;
BufferBlockManager::BufferBlockManager(int numberOfBlocks, int bufferSize)
dequeueSleepTime = std::chrono::milliseconds(100);
for (int x = 0; x < numberOfBlocks; x++)
BufferBlock* block = new BufferBlock(bufferSize);
blocks.push_front(block);
blocksPendingRead.push(block);
BufferBlockManager::~BufferBlockManager()
for (std::list<BufferBlock*>::const_iterator iterator = blocks.begin(), end = blocks.end(); iterator != end; ++iterator)
delete (*iterator);
void BufferBlockManager::enqueueBlockForRead(BufferBlock* block)
queueLock.lock();
block->actualSize = 0;
block->isLastBlock = false;
blocksPendingRead.push(block);
queueLock.unlock();
void BufferBlockManager::dequeueBlockForRead(BufferBlock** block)
WAITFOR:
while (blocksPendingRead.size() == 0)
std::this_thread::sleep_for(dequeueSleepTime);
queueLock.lock();
if (blocksPendingRead.size() == 0)
queueLock.unlock();
goto WAITFOR;
*block = blocksPendingRead.front();
blocksPendingRead.pop();
queueLock.unlock();
void BufferBlockManager::enqueueBlockForWrite(BufferBlock* block)
queueLock.lock();
blocksPendingWrite.push(block);
queueLock.unlock();
void BufferBlockManager::dequeueBlockForWrite(BufferBlock** block)
WAITFOR:
while (blocksPendingWrite.size() == 0)
std::this_thread::sleep_for(dequeueSleepTime);
queueLock.lock();
if (blocksPendingWrite.size() == 0)
queueLock.unlock();
goto WAITFOR;
*block = blocksPendingWrite.front();
blocksPendingWrite.pop();
queueLock.unlock();
void BufferBlockManager::resetState()
queueLock.lock();
blocksPendingRead = std::queue<BufferBlock*>();
blocksPendingWrite = std::queue<BufferBlock*>();
for (std::list<BufferBlock*>::const_iterator iterator = blocks.begin(), end = blocks.end(); iterator != end; ++iterator)
(*iterator)->actualSize = 0;
queueLock.unlock();
struct AsyncCopyContext
AsyncCopyContext(BufferBlockManager* bufferBlockManager, ReadStream* readStream, WriteStream* writeStream)
this->bufferBlockManager = bufferBlockManager;
this->readStream = readStream;
this->writeStream = writeStream;
this->readResult = ASYNC_COPY_READ_WRITE_SUCCESS;
this->writeResult = ASYNC_COPY_READ_WRITE_SUCCESS;
BufferBlockManager* bufferBlockManager;
ReadStream* readStream;
WriteStream* writeStream;
int readResult;
int writeResult;
;
void ReadStreamThread(AsyncCopyContext* asyncContext)
int bytesRead = 0;
BufferBlock* readBuffer = NULL;
int readResult = ASYNC_COPY_READ_WRITE_SUCCESS;
while (
// as long there hasn't been any write errors
asyncContext->writeResult == ASYNC_COPY_READ_WRITE_SUCCESS
// and we haven't had an error reading yet
&& readResult == ASYNC_COPY_READ_WRITE_SUCCESS)
// let's deque a block to read to!
asyncContext->bufferBlockManager->dequeueBlockForRead(&readBuffer);
readResult = asyncContext->readStream->read(readBuffer->buffer, readBuffer->bufferSize, &bytesRead);
readBuffer->actualSize = bytesRead;
readBuffer->isLastBlock = bytesRead == 0;
if (readResult == ASYNC_COPY_READ_WRITE_SUCCESS)
// this was a valid read, go ahead and queue it for writing
asyncContext->bufferBlockManager->enqueueBlockForWrite(readBuffer);
else
// an error occured reading
asyncContext->readResult = readResult;
// since an error occured, lets queue an block to write indicatiting we are done and there are no more bytes to read
readBuffer->isLastBlock = true;
readBuffer->actualSize = 0;
asyncContext->bufferBlockManager->enqueueBlockForWrite(readBuffer);
if (readBuffer->isLastBlock) return;
void WriteStreamThread(AsyncCopyContext* asyncContext)
int bytesWritten = 0;
BufferBlock* writeBuffer = NULL;
int writeResult = ASYNC_COPY_READ_WRITE_SUCCESS;
bool isLastWriteBlock = false;
while (
// as long as there are no errors during reading
asyncContext->readResult == ASYNC_COPY_READ_WRITE_SUCCESS
// and we haven't had an error writing yet
&& writeResult == ASYNC_COPY_READ_WRITE_SUCCESS)
// lets dequeue a block for writing!
asyncContext->bufferBlockManager->dequeueBlockForWrite(&writeBuffer);
isLastWriteBlock = writeBuffer->isLastBlock;
if (writeBuffer->actualSize > 0)
writeResult = asyncContext->writeStream->write(writeBuffer->buffer, writeBuffer->actualSize, &bytesWritten);
if (writeResult == ASYNC_COPY_READ_WRITE_SUCCESS)
asyncContext->bufferBlockManager->enqueueBlockForRead(writeBuffer);
if (isLastWriteBlock) return;
else
asyncContext->writeResult = writeResult;
asyncContext->bufferBlockManager->enqueueBlockForRead(writeBuffer);
return;
void AsyncCopyStream(BufferBlockManager* bufferBlockManager, ReadStream* readStream, WriteStream* writeStream, int* readResult, int* writeResult)
AsyncCopyContext asyncContext(bufferBlockManager, readStream, writeStream);
std::thread readThread(ReadStreamThread, &asyncContext);
std::thread writeThread(WriteStreamThread, &asyncContext);
readThread.join();
writeThread.join();
*readResult = asyncContext.readResult;
*writeResult = asyncContext.writeResult;
用法
#include <stdio.h>
#include <tchar.h>
#include "AsyncReadWrite.h"
struct ReadTestStream : ReadStream
int readCount = 0;
int read(char* buffer, int bufferSize, int* bytesRead)
printf("Starting read...\n");
memset(buffer, bufferSize, 0);
if (readCount == 10)
*bytesRead = 0;
return 0;
// pretend this function takes a while!
std::this_thread::sleep_for(std::chrono::milliseconds(100));
char buff[100];
sprintf_s(buff, "This is read number %d\n", readCount);
strcpy_s(buffer, sizeof(buff), buff);
*bytesRead = strlen(buffer);
readCount++;
printf("Finished read...\n");
return 0;
;
struct WriteTestStream : WriteStream
int write(char* buffer, int bufferSize, int* bytesWritten)
printf("Starting write...\n");
// pretend this function takes a while!
std::this_thread::sleep_for(std::chrono::milliseconds(500));
printf(buffer);
printf("Finished write...\n");
return 0;
;
int _tmain(int argc, _TCHAR* argv[])
BufferBlockManager bufferBlockManager(5, 4096);
ReadTestStream readStream;
WriteTestStream writeStream;
int readResult = 0;
int writeResult = 0;
printf("Starting copy...\n");
AsyncCopyStream(&bufferBlockManager, &readStream, &writeStream, &readResult, &writeResult);
printf("Finished copy... readResult=%d writeResult=%d \n", readResult, writeResult);
getchar();
return 0;
编辑:我将我的解决方案放入 GitHub 存储库 here。如果您希望使用此代码,请参阅存储库,因为它可能比此答案更新。
【讨论】:
【参考方案2】:通常,每个方向只有一个线程,在读取和写入之间交替。
【讨论】:
以上是关于如何在 C++ 中异步读/写?的主要内容,如果未能解决你的问题,请参考以下文章