在 Java 的并行线程中写入文件的最佳方法是啥?

Posted

技术标签:

【中文标题】在 Java 的并行线程中写入文件的最佳方法是啥?【英文标题】:What is the best way to write to a file in a parallel thread in Java?在 Java 的并行线程中写入文件的最佳方法是什么? 【发布时间】:2011-06-01 19:16:43 【问题描述】:

我有一个程序可以执行大量计算并经常将它们报告到文件中。我知道频繁的写入操作会大大降低程序速度,因此为避免这种情况,我希望有第二个线程专门用于写入操作。

现在我正在用我写的这个类来做(不耐烦的可以跳到问题的末尾):

public class ParallelWriter implements Runnable 

    private File file;
    private BlockingQueue<Item> q;
    private int indentation;

    public ParallelWriter( File f )
        file = f;
        q = new LinkedBlockingQueue<Item>();
        indentation = 0;
    

    public ParallelWriter append( CharSequence str )
        try 
            CharSeqItem item = new CharSeqItem();
            item.content = str;
            item.type = ItemType.CHARSEQ;
            q.put(item);
            return this;
         catch (InterruptedException ex) 
            throw new RuntimeException( ex );
        
    

    public ParallelWriter newLine()
        try 
            Item item = new Item();
            item.type = ItemType.NEWLINE;
            q.put(item);
            return this;
         catch (InterruptedException ex) 
            throw new RuntimeException( ex );
        
    

    public void setIndent(int indentation) 
        try
            IndentCommand item = new IndentCommand();
            item.type = ItemType.INDENT;
            item.indent = indentation;
            q.put(item);
         catch (InterruptedException ex) 
            throw new RuntimeException( ex );
        
    

    public void end()
        try 
            Item item = new Item();
            item.type = ItemType.POISON;
            q.put(item);
         catch (InterruptedException ex) 
            throw new RuntimeException( ex );
        
    

    public void run() 

        BufferedWriter out = null;
        Item item = null;

        try
            out = new BufferedWriter( new FileWriter( file ) );
            while( (item = q.take()).type != ItemType.POISON )
                switch( item.type )
                    case NEWLINE:
                        out.newLine();
                        for( int i = 0; i < indentation; i++ )
                            out.append("   ");
                        break;
                    case INDENT:
                        indentation = ((IndentCommand)item).indent;
                        break;
                    case CHARSEQ:
                        out.append( ((CharSeqItem)item).content );
                
            
         catch (InterruptedException ex)
            throw new RuntimeException( ex );
         catch  (IOException ex) 
            throw new RuntimeException( ex );
         finally 
            if( out != null ) try 
                out.close();
             catch (IOException ex) 
                throw new RuntimeException( ex );
            
        
    

    private enum ItemType 
        CHARSEQ, NEWLINE, INDENT, POISON;
    
    private static class Item 
        ItemType type;
    
    private static class CharSeqItem extends Item 
        CharSequence content;
    
    private static class IndentCommand extends Item 
        int indent;
    

然后我通过这样做来使用它:

ParallelWriter w = new ParallelWriter( myFile );
new Thread(w).start();

/// Lots of
w.append(" things ").newLine();
w.setIndent(2);
w.newLine().append(" more things ");

/// and finally
w.end();

虽然这很好用,但我想知道: 有没有更好的方法来做到这一点?

【问题讨论】:

类似问题:***.com/questions/8602466/… 【参考方案1】:

您的基本方法看起来不错。我将代码结构如下:

    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.IOException;
    import java.io.Writer;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public interface FileWriter 
        FileWriter append(CharSequence seq);
    
        FileWriter indent(int indent);
    
        void close();
    
    
    class AsyncFileWriter implements FileWriter, Runnable 
        private final File file;
        private final Writer out;
        private final BlockingQueue<Item> queue = new LinkedBlockingQueue<Item>();
        private volatile boolean started = false;
        private volatile boolean stopped = false;
    
        public AsyncFileWriter(File file) throws IOException 
            this.file = file;
            this.out = new BufferedWriter(new java.io.FileWriter(file));
        
    
        public FileWriter append(CharSequence seq) 
            if (!started) 
                throw new IllegalStateException("open() call expected before append()");
            
            try 
                queue.put(new CharSeqItem(seq));
             catch (InterruptedException ignored) 
            
            return this;
        
    
        public FileWriter indent(int indent) 
            if (!started) 
                throw new IllegalStateException("open() call expected before append()");
            
            try 
                queue.put(new IndentItem(indent));
             catch (InterruptedException ignored) 
            
            return this;
        
    
        public void open() 
            this.started = true;
            new Thread(this).start();
        
    
        public void run() 
            while (!stopped) 
                try 
                    Item item = queue.poll(100, TimeUnit.MICROSECONDS);
                    if (item != null) 
                        try 
                            item.write(out);
                         catch (IOException logme) 
                        
                    
                 catch (InterruptedException e) 
                
            
            try 
                out.close();
             catch (IOException ignore) 
            
        
    
        public void close() 
            this.stopped = true;
        
    
        private static interface Item 
            void write(Writer out) throws IOException;
        
    
        private static class CharSeqItem implements Item 
            private final CharSequence sequence;
    
            public CharSeqItem(CharSequence sequence) 
                this.sequence = sequence;
            
    
            public void write(Writer out) throws IOException 
                out.append(sequence);
            
        
    
        private static class IndentItem implements Item 
            private final int indent;
    
            public IndentItem(int indent) 
                this.indent = indent;
            
    
            public void write(Writer out) throws IOException 
                for (int i = 0; i < indent; i++) 
                    out.append(" ");
                
            
        
    

如果您不想在单独的线程中编写(可能是在测试中?),您可以实现FileWriter,它在调用者线程中的Writer 上调用append

【讨论】:

谢谢,将特定于项目的任务委派给项目比我的做法更符合 OOP。另外,与毒元素相比,使用this.stopped 结束阅读有什么特别的优势吗? 另外你的缩进操作做了一些稍微不同的事情:我的缩进设置所有未来行的缩进,你的只是在当前位置缩进。 @trutheality 我使用了stopped 变量,因为它是协作停止线程的标准习惯用法。此外,您可以使用它来防止在调用end 之后调用append。我在贴出的原代码中误解了缩进操作的功能。 @trutheality 我也对所有不变异的成员使用finals。唯一会发生变异的是队列和标志。队列是一个众所周知的标准类,它是线程安全的。写入标志时,它们的新值不依赖于旧值 - 因此将它们标记为volatile 是安全的。我认为这使得总体上更容易推理线程安全。 一个简短的问题 - 我认为作者不会写下提供给它的所有输入;但是一旦调用 stop() 方法就会停止,因为它将在 while() 循环条件下进行检查。所以很可能,我们可能需要检查一下(队列是否为空 && 已停止)。【参考方案2】:

与单个消费者线程交换数据的一种好方法是使用 Exchanger。

您可以使用 StringBuilder 或 ByteBuffer 作为与后台线程交换的缓冲区。产生的延迟可能在 1 微秒左右,不涉及创建任何对象,并且使用 BlockingQueue 会更低。

来自我认为值得在这里重复的例子。

class FillAndEmpty 
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable 
     public void run() 
       DataBuffer currentBuffer = initialEmptyBuffer;
       try 
         while (currentBuffer != null) 
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         
        catch (InterruptedException ex)  ... handle ... 
     
   

   class EmptyingLoop implements Runnable 
     public void run() 
       DataBuffer currentBuffer = initialFullBuffer;
       try 
         while (currentBuffer != null) 
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         
        catch (InterruptedException ex)  ... handle ...
     
   

   void start() 
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   
 

【讨论】:

感谢您的想法,它确实教会了我一些新东西。我不知道在我的情况下这是否是正确的方法:我真的不希望生产者等待消费者,这在这里似乎是必要的。 @truheality,只有在消费者跟不上时,生产者才会等待。在这种情况下,您有一个队列可能会隐藏的问题。一旦队列变得太长,您的性能可能会以不可预知的方式受到影响。【参考方案3】:

使用 LinkedBlockingQueue 是个好主意。不确定我是否喜欢代码的某些风格……但原则似乎是合理的。

我可能会为 LinkedBlockingQueue 添加一个容量,等于总内存的某个百分比。比如说 10,000 个项目。这样,如果你的写入速度太慢,你的工作线程将不会继续添加更多工作,直到堆被炸毁了。

【讨论】:

【参考方案4】:

我知道频繁的写操作 可以大大减慢程序速度

如果您使用缓冲,可能没有您想象的那么多。

【讨论】:

以上是关于在 Java 的并行线程中写入文件的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

使用 python 3.6 将多个文件并行加载到内存中的最佳方法是啥?

在 Java 中更改 XML 文件中的一个值的最佳方法是啥?

将大文件写入 S3 的最佳方法是啥?

在 PHP 中将大文件写入磁盘的最佳方法是啥?

使用 APScheduler 在 python 中进行并行编程的最佳方法是啥?

将 StringIO 的内容写入文件的最佳方法是啥?