用cshell怎么逐行读文件逐行进行处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用cshell怎么逐行读文件逐行进行处理相关的知识,希望对你有一定的参考价值。

参考技术A #include <stdio.h>
#include <stdlib.h>
#define MAXLINE 100
int main()


FILE *fp;
char arr[MAXLINE+1];

if ((fp = fopen ("MYFILE", "r")) == NULL)

perror ("File open error!\n");
exit (1);

while ((fgets (arr, MAXLINE, fp)) != NULL)

fputs (arr, stdout);

return 0;本回答被提问者和网友采纳

随机无限线程和各种错误,同时并行逐行读/写

【中文标题】随机无限线程和各种错误,同时并行逐行读/写【英文标题】:Randomly endless thread and various bugs, while parallel line-by-line reading/writing 【发布时间】:2010-07-11 15:02:26 【问题描述】:

我想基于boost::thread逐行实现并行读-处理-写,但是当前版本行为不定:下面的测试通过填充读(并发)队列来读取一个CSV文件,即简单地转移到写入队列中以写入输出文件(暂时不处理)。

遇到的问题:

在 Windows 和 Unix 上,程序随机永远不会结束(~3/5 次),并生成一个 SIGSEGV (~1/100) 在 Unix 上,有很多错误:创建线程时出现 SIGABRT,创建后“内存在分配块之前被破坏”(-> SIGABRT 也是如此),随机出现在 1 到 ~15 行之间。

我讨厌给出问题和代码并让其他人回答(我有时会站在你这边),但相信我,我想不出任何其他方法来纠正它(处理线程、调试一场噩梦),所以我提前道歉。这里是:

Main.cpp:

#include "io.hpp"

#include <iostream>

int main(int argc, char *argv[]) 
    CSV::Reader reader;
    CSV::Writer writer;

    if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) 
        CSV::Row row;

        reader.run(); //Reads the CSV file and fills the read queue
        writer.run(); //Reads the to-be-written queue and writes it to a txt file

        //The loop is supposed to end only if the reader is finished and empty
        while(!(reader.is_finished() && reader.empty())) 
            //Transfers line by line from the read to the to-be-written queues
            reader.wait_and_pop(row);
            writer.push(row);
        
        //The reader will likely finish before the writer, so he has to finish his queue before continuing.
        writer.finish(); 
    
    else 
        std::cout << "File error";
    

    return EXIT_SUCCESS;

Io.hpp:

#ifndef IO_H_INCLUDED
#define IO_H_INCLUDED

#include "threads.hpp"

#include <fstream>

namespace CSV 
    class Row 
        std::vector<std::string> m_data;

        friend class Iterator;
        friend void write_row(Row const &row, std::ostream &stream);

        void read_next(std::istream& csv);

        public:
            inline std::string const& operator[](std::size_t index) const 
                return m_data[index];
            
            inline std::size_t size() const 
                return m_data.size();
            
    ;

    /** Reading *************************************************************************/

    class Iterator 
        public:
            Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) 
                ++(*this);
            
            Iterator() : m_csv(NULL) 

            //Pre-Increment
            Iterator& operator++() 
                if (m_csv != NULL) 
                    m_row.read_next(*m_csv);
                    m_csv = m_csv->good() ? m_csv : NULL;
                

                return *this;
            
            inline Row const& operator*() const 
                return m_row;
            

            inline bool operator==(Iterator const& rhs) 
                return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
            
            inline bool operator!=(Iterator const& rhs) 
                return !((*this) == rhs);
            
        private:
            std::istream* m_csv;
            Row m_row;
    ;

    class Reader : public Concurrent_queue<Row>, public Thread 
        std::ifstream m_csv;

        Thread_safe_value<bool> m_finished;

        void work() 
            if(!!m_csv) 
                for(Iterator it(m_csv) ; it != Iterator() ; ++it) 
                    push(*it);
                
                m_finished.set(true);
            
        

    public:
        Reader() 
            m_finished.set(false);
        

        inline bool open(std::string path) 
            m_csv.open(path.c_str());

            return !!m_csv;
        

        inline bool is_finished() 
            return m_finished.get();
        
    ;

    /** Writing ***************************************************************************/

    void write_row(Row const &row, std::ostream &stream);

    //Is m_finishing really thread-safe ? By the way, is it mandatory ?
    class Writer : public Concurrent_queue<Row>, public Thread 
        std::ofstream m_csv;

        Thread_safe_value<bool> m_finishing;

        void work() 
            if(!!m_csv) 
                CSV::Row row;

                while(!(m_finishing.get() && empty())) 
                    wait_and_pop(row);
                    write_row(row, m_csv);
                
            
        

    public:
        Writer() 
            m_finishing.set(false);
        

        inline void finish() 
            m_finishing.set(true);
            catch_up();
        

        inline bool open(std::string path) 
            m_csv.open(path.c_str());

            return !!m_csv;
        
    ;


#endif

Io.cpp:

#include "io.hpp"

#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>

void CSV::Row::read_next(std::istream& csv) 
    std::string row;
    std::getline(csv, row);

    boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
    m_data.assign(tokenizer.begin(), tokenizer.end());


void CSV::write_row(Row const &row, std::ostream &stream) 
    std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
    stream << std::endl;

Threads.hpp:

#ifndef THREADS_HPP_INCLUDED
#define THREADS_HPP_INCLUDED

#include <boost/bind.hpp>
#include <boost/thread.hpp>

class Thread 
protected:
    boost::thread *m_thread;

    virtual void work() = 0;

    void do_work() 
        work();
    

public:
    Thread() : m_thread(NULL) 
    virtual ~Thread() 
        catch_up();
        if(m_thread != NULL) 
            delete m_thread;
        
    

    inline void catch_up() 
        if(m_thread != NULL) 
            m_thread->join();
        
    

    void run() 
        m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
    
;

/** Thread-safe datas **********************************************************/

#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

template <class T>
class Thread_safe_value : public boost::noncopyable 
    T m_value;
    boost::mutex m_mutex;

    public:
        T const &get() 
            boost::mutex::scoped_lock lock(m_mutex);
            return m_value;
        
        void set(T const &value) 
            boost::mutex::scoped_lock lock(m_mutex);
            m_value = value;
        
;

template<typename Data>
class Concurrent_queue 
    std::queue<Data> m_queue;
    mutable boost::mutex m_mutex;
    boost::condition_variable m_cond;

public:
    void push(Data const& data) 
        boost::mutex::scoped_lock lock(m_mutex);
        m_queue.push(data);
        lock.unlock();
        m_cond.notify_one();
    

    bool empty() const 
        boost::mutex::scoped_lock lock(m_mutex);
        return m_queue.empty();
    

    void wait_and_pop(Data& popped) 
        boost::mutex::scoped_lock lock(m_mutex);
        while(m_queue.empty()) 
            m_cond.wait(lock);
        

        popped = m_queue.front();
        m_queue.pop();
    
;

#endif // THREAD_HPP_INCLUDED

这个项目很重要,如果你能帮助我,我将不胜感激 =)

先谢谢了。

问候,

神秘先生。

【问题讨论】:

您能用pattern或几句话概括一下线程设计吗?是生产者/消费者吗? @Sam Miller:我不知道 valgrind,我会尝试安装它。 @jdv:你是什么意思?这很简单:想象有两个人在一条生产线上工作,并且必须将箱子从第一个后面移到第二个后面。第一个(阅读器)在他身后拿一个盒子(这是阅读)并将其放在第二个(这是阅读队列)的前面。第二个无法到达它,所以有一个小型跑步机(主循环,通常处理)将它(通过wait_and_pull()和push())带到第二个脚下(待写队列),谁最终把它放在他身后(这是写作),等等。 以后会有几个reader,一个processing(会等待readers给每一个box)和一个writer。 【参考方案1】:

您的完成逻辑有错误。

main() 循环正在读取队列中的最后一个条目,并在设置 m_finished 标志之前阻塞等待下一个条目。

如果您在调用 m_finished.set(true) 之前等待很长时间(例如 Linux 上的 sleep(5) 或 Windows 上的 Sleep(5000) 等待 5 秒),那么您的代码每次都会挂起。

(这并没有解决段错误或内存分配错误,这可能是其他问题)

有问题的执行是这样的:

    读取器线程从文件中读取最后一项并推送到队列中。 主线程从队列中弹出最后一项。 主线程为写入线程推送队列中的最后一项。 主线程循环; m_finished 没有设置,所以它调用wait_and_pop。 阅读器线程意识到它在文件末尾并设置m_finished。 主线程现在被阻塞,等待阅读器队列中的另一个项目,但阅读器不会提供一个。

睡眠调用通过在读取器线程上的步骤 1 和 5 之间放置很大的延迟来强制执行此事件顺序,因此主线程有很多机会执行步骤 2-4。对于竞态条件,这是一种有用的调试技术。

【讨论】:

感谢您的回答 =) 我不明白,您是什么意思?我不能承受几秒钟的睡眠时间,我有一个性能目标......如果这就是解决方案的目的(我不明白为什么它应该工作,所以我无论如何都进行了测试,这并没有改变任何东西: / )。 睡眠是一种调试辅助。我已经更新了上面的答案以反映这一点。 我的理解是,当队列为空但 m_finished 即将设置为 true 时,我们无论如何都会进入循环,wait_and_pop() 等待队列中不会到达的项目,因为阅读器现在完成了。我使用 bool try_pop(Data& popped) 而不是 void wait_and_pop(Data& popped) 解决了这个问题,它仅在队列不为空时才会弹出。因此,在循环中: if(reader.try_pop(row)) writer.push(row); 奇迹般有效。我将在 Unix 下尝试看看它是否修复了其他错误。谢谢你让我大开眼界:) 哦,如果有什么让你震惊的,请告诉我!我不允许犯任何错误。 使用 try_pop 会起作用,但会失去阻塞队列的好处,因为主线程将消耗 CPU 时间循环和检查新消息。通常值得拥有一个特殊的“哨兵”值,您可以将其发布到队列中以指示“没有更多数据”。这也避免了对单独的“完成”标志的需要。【参考方案2】:

在快速阅读后,我发现的唯一明显问题可能是您的部分(但可能不是全部)问题,因为您没有在Concurrent_queue::push 中正确地表明您的状况。

任何时候你发现自己在作用域互斥体上调用unlock() 都应该向你表明有问题了。使用范围互斥锁的要点之一是当对象进入/离开范围时锁定和解锁是隐式的。如果您发现自己需要解锁,则可能需要重组代码。

不过,在这种情况下,您实际上不需要重构代码。在这种情况下,解锁是错误的。当发出条件信号时,需要锁定互斥锁。信号发生后解锁。所以你可以用这个代码替换解锁:

void push(Data const& data) 
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push(data);
    m_cond.notify_one();

当函数返回时,在条件发出信号后,它将解锁互斥锁。

【讨论】:

你确定吗?我认为编写该类代码的人(这是代码中唯一不属于我的部分:justsoftwaresolutions.co.uk/threading/…)希望在通知之前确保锁已解锁……除此之外,这不会改变任何事情在 Windows 上(我暂时无法在 Unix 上尝试):线程一遍又一遍地运行,并且永远不会返回 3/5 次。 谢谢你花时间回答我,我忘了礼貌^^

以上是关于用cshell怎么逐行读文件逐行进行处理的主要内容,如果未能解决你的问题,请参考以下文章

c语言如何读写二进制

c语言如何进行二进制文件的操作?

delphi中怎么逐行读取文本文件的数据并将每行分别写入指定的不同编辑框

python处理txt文件的若干脚本

shell读取文件的每一行内容并输出

请问一下 怎么用c语言实现读取一个txt文件里的数据 要按行读出来