如何在 C++ 中异步读/写?

Posted

技术标签:

【中文标题】如何在 C++ 中异步读/写?【英文标题】:How to asynchronously read/write in C++? 【发布时间】:2015-06-03 11:08:03 【问题描述】:

如何使用 C++ 中的专用读/写线程将一个流复制到另一个流?

假设我有这些方法(不是真实的,但为了说明这一点)来读取/写入数据。这些读/写函数可以代表任何东西(网络/文件/USB/串行/等)。

// returns the number of bytes read
void read(char* buffer, int bufferSize, int* bytesRead);
// returns the number of bytes written
void write(char* buffer, int bufferSize, int* bytesWritten);

解决方案也应该是可移植的。

注意:我知道 Windows 有 FILE_FLAG_OVERLAPPED 功能,但这假设读/写是文件 IO。请记住,这些读/写方法可以代表任何东西。

【问题讨论】:

你能澄清一下你的问题吗?您的问题是关于从一个线程读取并写入另一个线程吗?或者从一个单独的线程中读取一个流并从另一个线程写入同一个流? 也看看 boost.asio。 确实需要一些说明,并且可能是一个使用示例。 “野外有图书馆已经在做这样的事情了吗?”就好像是?那提供那些声明?便携性怎么样? 【参考方案1】:

这是我想出的解决方案。

标题

#pragma once

#include <stdlib.h>
#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <list>
#include <thread>

#define ASYNC_COPY_READ_WRITE_SUCCESS 0

struct BufferBlock;

struct ReadStream

    // read a stream to a buffer.
    // return non-zero if error occured
    virtual int read(char* buffer, int bufferSize, int* bytesRead) = 0;
;

struct WriteStream

    // write a buffer to a stream.
    // return non-zero if error occured
    virtual int write(char* buffer, int bufferSize, int* bytesWritten) = 0;
;

class BufferBlockManager


public:

    BufferBlockManager(int numberOfBlocks, int bufferSize);
    ~BufferBlockManager();

    void enqueueBlockForRead(BufferBlock* block);
    void dequeueBlockForRead(BufferBlock** block);
    void enqueueBlockForWrite(BufferBlock* block);
    void dequeueBlockForWrite(BufferBlock** block);
    void resetState();

private:

    std::list<BufferBlock*> blocks;
    std::queue<BufferBlock*> blocksPendingRead;
    std::queue<BufferBlock*> blocksPendingWrite;
    std::mutex queueLock;
    std::chrono::milliseconds dequeueSleepTime;

;

void AsyncCopyStream(BufferBlockManager* bufferBlockManager, ReadStream* readStream, WriteStream* writeStream, int* readResult, int* writeResult);

CPP

#include "AsyncReadWrite.h"

struct BufferBlock

    BufferBlock(int bufferSize) : buffer(NULL)
    
        this->bufferSize = bufferSize;
        this->buffer = new char[bufferSize];
        this->actualSize = 0;
        this->isLastBlock = false;
    
    ~BufferBlock()
    
        this->bufferSize = 0;
        free(this->buffer);
        this->buffer = NULL;
        this->actualSize = 0;
    
    char* buffer;
    int bufferSize;
    int actualSize;
    bool isLastBlock;
;

BufferBlockManager::BufferBlockManager(int numberOfBlocks, int bufferSize)

    dequeueSleepTime = std::chrono::milliseconds(100);
    for (int x = 0; x < numberOfBlocks; x++)
    
        BufferBlock* block = new BufferBlock(bufferSize);
        blocks.push_front(block);
        blocksPendingRead.push(block);
    


BufferBlockManager::~BufferBlockManager()

    for (std::list<BufferBlock*>::const_iterator iterator = blocks.begin(), end = blocks.end(); iterator != end; ++iterator) 
        delete (*iterator);
    


void BufferBlockManager::enqueueBlockForRead(BufferBlock* block)

    queueLock.lock();

    block->actualSize = 0;
    block->isLastBlock = false;
    blocksPendingRead.push(block);

    queueLock.unlock();


void BufferBlockManager::dequeueBlockForRead(BufferBlock** block)

WAITFOR:
    while (blocksPendingRead.size() == 0)
        std::this_thread::sleep_for(dequeueSleepTime);

    queueLock.lock();

    if (blocksPendingRead.size() == 0)
    
        queueLock.unlock();
        goto WAITFOR;
    

    *block = blocksPendingRead.front();

    blocksPendingRead.pop();

    queueLock.unlock();


void BufferBlockManager::enqueueBlockForWrite(BufferBlock* block)

    queueLock.lock();

    blocksPendingWrite.push(block);

    queueLock.unlock();


void BufferBlockManager::dequeueBlockForWrite(BufferBlock** block)

WAITFOR:
    while (blocksPendingWrite.size() == 0)
        std::this_thread::sleep_for(dequeueSleepTime);

    queueLock.lock();

    if (blocksPendingWrite.size() == 0)
    
        queueLock.unlock();
        goto WAITFOR;
    

    *block = blocksPendingWrite.front();

    blocksPendingWrite.pop();

    queueLock.unlock();


void BufferBlockManager::resetState()

    queueLock.lock();

    blocksPendingRead = std::queue<BufferBlock*>();
    blocksPendingWrite = std::queue<BufferBlock*>();

    for (std::list<BufferBlock*>::const_iterator iterator = blocks.begin(), end = blocks.end(); iterator != end; ++iterator) 
        (*iterator)->actualSize = 0;
    

    queueLock.unlock();


struct AsyncCopyContext

    AsyncCopyContext(BufferBlockManager* bufferBlockManager, ReadStream* readStream, WriteStream* writeStream)
    
        this->bufferBlockManager = bufferBlockManager;
        this->readStream = readStream;
        this->writeStream = writeStream;
        this->readResult = ASYNC_COPY_READ_WRITE_SUCCESS;
        this->writeResult = ASYNC_COPY_READ_WRITE_SUCCESS;
    
    BufferBlockManager* bufferBlockManager;
    ReadStream* readStream;
    WriteStream* writeStream;
    int readResult;
    int writeResult;
;

void ReadStreamThread(AsyncCopyContext* asyncContext)

    int bytesRead = 0;
    BufferBlock* readBuffer = NULL;
    int readResult = ASYNC_COPY_READ_WRITE_SUCCESS;

    while (
        // as long there hasn't been any write errors
        asyncContext->writeResult == ASYNC_COPY_READ_WRITE_SUCCESS
        // and we haven't had an error reading yet
        && readResult == ASYNC_COPY_READ_WRITE_SUCCESS)
    
        // let's deque a block to read to!
        asyncContext->bufferBlockManager->dequeueBlockForRead(&readBuffer);

        readResult = asyncContext->readStream->read(readBuffer->buffer, readBuffer->bufferSize, &bytesRead);
        readBuffer->actualSize = bytesRead;
        readBuffer->isLastBlock = bytesRead == 0;

        if (readResult == ASYNC_COPY_READ_WRITE_SUCCESS)
        
            // this was a valid read, go ahead and queue it for writing
            asyncContext->bufferBlockManager->enqueueBlockForWrite(readBuffer);
        
        else
        
            // an error occured reading
            asyncContext->readResult = readResult;

            // since an error occured, lets queue an block to write indicatiting we are done and there are no more bytes to read
            readBuffer->isLastBlock = true;
            readBuffer->actualSize = 0;

            asyncContext->bufferBlockManager->enqueueBlockForWrite(readBuffer);
        

        if (readBuffer->isLastBlock) return;
    


void WriteStreamThread(AsyncCopyContext* asyncContext)

    int bytesWritten = 0;
    BufferBlock* writeBuffer = NULL;
    int writeResult = ASYNC_COPY_READ_WRITE_SUCCESS;
    bool isLastWriteBlock = false;

    while (
        // as long as there are no errors during reading
        asyncContext->readResult == ASYNC_COPY_READ_WRITE_SUCCESS
        // and we haven't had an error writing yet
        && writeResult == ASYNC_COPY_READ_WRITE_SUCCESS)
    
        // lets dequeue a block for writing!
        asyncContext->bufferBlockManager->dequeueBlockForWrite(&writeBuffer);

        isLastWriteBlock = writeBuffer->isLastBlock;

        if (writeBuffer->actualSize > 0)
            writeResult = asyncContext->writeStream->write(writeBuffer->buffer, writeBuffer->actualSize, &bytesWritten);

        if (writeResult == ASYNC_COPY_READ_WRITE_SUCCESS)
        
            asyncContext->bufferBlockManager->enqueueBlockForRead(writeBuffer);
            if (isLastWriteBlock) return;
        
        else
        
            asyncContext->writeResult = writeResult;
            asyncContext->bufferBlockManager->enqueueBlockForRead(writeBuffer);
            return;
        
    


void AsyncCopyStream(BufferBlockManager* bufferBlockManager, ReadStream* readStream, WriteStream* writeStream, int* readResult, int* writeResult)

    AsyncCopyContext asyncContext(bufferBlockManager, readStream, writeStream);
    std::thread readThread(ReadStreamThread, &asyncContext);
    std::thread writeThread(WriteStreamThread, &asyncContext);

    readThread.join();
    writeThread.join();

    *readResult = asyncContext.readResult;
    *writeResult = asyncContext.writeResult;

用法

#include <stdio.h>
#include <tchar.h>
#include "AsyncReadWrite.h"

struct ReadTestStream : ReadStream

    int readCount = 0;
    int read(char* buffer, int bufferSize, int* bytesRead)
    
        printf("Starting read...\n");

        memset(buffer, bufferSize, 0);

        if (readCount == 10)
        
            *bytesRead = 0;
            return 0;
        

        // pretend this function takes a while!
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        char buff[100];
        sprintf_s(buff, "This is read number %d\n", readCount);
        strcpy_s(buffer, sizeof(buff), buff);

        *bytesRead = strlen(buffer);

        readCount++;

        printf("Finished read...\n");

        return 0;
    
;

struct WriteTestStream : WriteStream

    int write(char* buffer, int bufferSize, int* bytesWritten)
    
        printf("Starting write...\n");

        // pretend this function takes a while!
        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        printf(buffer);

        printf("Finished write...\n");

        return 0;
    
;

int _tmain(int argc, _TCHAR* argv[])

    BufferBlockManager bufferBlockManager(5, 4096);
    ReadTestStream readStream;
    WriteTestStream writeStream;
    int readResult = 0;
    int writeResult = 0;

    printf("Starting copy...\n");

    AsyncCopyStream(&bufferBlockManager, &readStream, &writeStream, &readResult, &writeResult);

    printf("Finished copy... readResult=%d writeResult=%d \n", readResult, writeResult);

    getchar();

    return 0;

编辑:我将我的解决方案放入 GitHub 存储库 here。如果您希望使用此代码,请参阅存储库,因为它可能比此答案更新。

【讨论】:

【参考方案2】:

通常,每个方向只有一个线程,在读取和写入之间交替。

【讨论】:

以上是关于如何在 C++ 中异步读/写?的主要内容,如果未能解决你的问题,请参考以下文章

使用 C++ 在 Linux 中计算磁盘读/写

异步fifo的Verilog实现

如何在 C++ 中将数据读/写到 excel 2007 中?

如何在 C++ 中运行异步增强功能?

我如何在 C++ 中使用异步函数?

如何在 C++ 中异步执行 curl_multi_perform()?