在进行多线程处理时保存数据

Posted

技术标签:

【中文标题】在进行多线程处理时保存数据【英文标题】:Saving data while doing multithreaded processing 【发布时间】:2016-10-01 07:31:55 【问题描述】:

有一个下载器应用程序,它在多个线程中对下载项目执行不同类型的处理。一些线程分析输入数据,一些执行下载、提取、保存状态等。因此,每种类型的线程都对某些数据成员进行操作,其中一些线程可能同时运行。下载项可以这样描述:

class File;

class Download

public:
    enum State
    
        Parsing, Downloading, Extracting, Repairing, Finished
    ;

    Download(const std::string &filePath): filePath(filePath)  

    void save()
    
        // TODO: save data consistently

        StateFile f;    // state file for this download

        // save general download parameters
        f << filePath << state << bytesWritten << totalFiles << processedFiles;

        // Now we are to save the parameters of the files which belong to this download,
        // (!) but assume the downloading thread kicks in, downloads some data and 
        // changes the state of a file. That causes "bytesWritten", "processedFiles" 
        // and "state" to be different from what we have just saved.

        // When we finally save the state of the files their parameters don't match 
        // the parameters of the download (state, bytesWritten, processedFiles).
        for (File *f : files)
        
            // save the file...
        
    

private:
    std::string filePath;
    std::atomic<State> state = Parsing;
    std::atomic<int> bytesWritten = 0;
    int totalFiles = 0;
    std::atomic<int> processedFiles = 0;
    std::mutex fileMutex;
    std::vector<File*> files;
;

我想知道如何一致地保存这些数据。例如,状态和已处理文件的数量可能已经保存,我们将保存文件列表。同时其他一些线程可能会改变文件的状态,从而改变处理文件的数量或下载的状态,使保存的数据不一致。

想到的第一个想法是为所有数据成员添加一个互斥锁,并在每次访问它们时锁定它任何。但这可能是低效的,因为大多数时间线程访问不同的数据成员并且在几分钟内只进行一次保存。

在我看来这样的任务在多线程编程中相当普遍,所以我希望有经验的人能提出更好的方法。

【问题讨论】:

“想到的第一个想法是为所有数据成员添加一个互斥锁,并在每次访问它们时锁定它。” - 为什么不能使用多个互斥锁并锁定对单个成员的访问?为什么不将类拆分为多个不同的类,以便每个线程可以安静地处理自己的数据片段,直到完成并将部分结果组装成最终结果? 好吧,如上所述,锁定单个成员并不能防止整个数据集被不一致地保存。例如。下载的保存状态和处理的文件数量可能与保存的文件列表不匹配。好吧,线程可能使用相同的数据成员。我只是说他们可能不会使用所有这些。 【参考方案1】:

我建议你使用生产者消费者模式。

下载器产生给解析器并通知它消费,解析器产生给提取器并通知它消费,提取器给修复器。 然后,您将为每个任务创建一个队列。同步可以使用条件变量进行优化,因此消费者只有在生产了某些东西后才收到通知时才拉动。您最终将使用更少的互斥锁和更具可读性和效率的设计。

这是一个队列的示例代码,如果您必须同时下载、解析、提取和保存,该怎么做:

#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
template<typename T>
class synchronized_queu

public:
    T consume_one()
    
        std::unique_lock<std::mutex> lock(lock_);
        while (queue_.size() == 0)
            condition_.wait(lock); //release and obtain again
        T available_data = queue_.front();
        queue_.pop();
        return available_data;
    
    void produce_one(const T& data)
    
        std::unique_lock<std::mutex> lock(lock_);
        queue_.push(data);
        condition_.notify_one();//notify only one or all as per your design...
    
private:
    std::queue<T> queue_;
    std::mutex lock_;
    std::condition_variable condition_;
;
struct data

    //.....
;

void download(synchronized_queu<data>& q)

    //...
    data data_downloaded = ; //data downloaded;
    q.produce_one(data_downloaded);


void parse(synchronized_queu<data>& q1, synchronized_queu<data>& q2)

    //...
    data data_downloaded = q1.consume_one();
    //parse
    data data_parsed = ;//....
    q2.produce_one(data_parsed);


void extract(synchronized_queu<data>& q1, synchronized_queu<data>& q2)

    //...
    data data_parsed = q1.consume_one();
    //parse
    data data_extracted = ;//....
    q2.produce_one(data_extracted);

void save(synchronized_queu<data>& q)

    data data_extracted = q.consume_one();
    //save....


int main()

    synchronized_queu<data> dowlowded_queue;
    synchronized_queu<data> parsed_queue;
    synchronized_queu<data> extracted_queue;

    std::thread downloader(download, dowlowded_queue);
    std::thread parser(parse, dowlowded_queue, parsed_queue);
    std::thread extractor(extract, parsed_queue, extracted_queue);
    std::thread saver(save, extracted_queue);
    while (/*condition to stop program*/)
    

    
    downloader.join();
    parser.join();
    extractor.join();
    saver.join();
    return 0;

【讨论】:

问题是这些任务打算同时运行,所以没有队列。无论其他线程在做什么,都需要定期进行保存。 是的,这就是我想建议的。为了简单的解释,让我们只有下载器和提取器;您同时启动共享同一队列 Q1 的两个线程下载器和提取器。下载器一旦下载了任何东西,它就会推送到队列中,并且当提取器从队列中拉出东西时,它会处理它...... 我的问题是在其他线程处理数据时保存数据。 保存器可能是一个在队列上工作的任务,其中包含要保存的最终数据。 必须每隔3分钟保存一次下载状态,以便程序在崩溃或异常终止时能够加载并继续下载。因此,即使是中间结果也必须保存。

以上是关于在进行多线程处理时保存数据的主要内容,如果未能解决你的问题,请参考以下文章

多线程系列之五:Balking 模式

使用多线程处理和保存图像

c#中利用system.timers多线程做图像处理,图像保存时提示“GDI+ 中发生一般性错误”,如何解决?

使用线程的场景

在循环中保存图像比多线程/多处理更快

在后台线程问题中将服务器数据保存到核心数据