如何定期将 c# FileStream 刷新到磁盘?

Posted

技术标签:

【中文标题】如何定期将 c# FileStream 刷新到磁盘?【英文标题】:How to periodically flush c# FileStream to the disk? 【发布时间】:2017-04-16 13:33:45 【问题描述】:

上下文

我正在为 Web API 项目实现一个日志记录机制,该项目将序列化对象从多个方法写入文件,然后由外部进程读取(nxLog 更准确)。该应用程序托管在 IIS 上并使用 18 个工作进程。应用程序池每天回收一次。将包含日志记录方法的服务的预期负载为 10,000 req/s。简而言之,这是一个经典的produces/consumer 问题,涉及多个生产者(产生日志的方法)和一个消费者(从日志文件中读取的外部进程)。 更新:每个进程也使用多个线程。

我使用BlockingCollection 来存储数据(并解决竞争条件)以及将数据从集合写入磁盘的长时间运行的任务。

要写入磁盘,我使用StreamWriterFileStream。 因为写入频率几乎是恒定的(正如我所说的每秒 10,000 次写入),我决定在应用程序池的整个生命周期内保持流打开,并定期将日志写入磁盘。我每天依靠 App Pool 回收和我的 DI 框架来处理我的记录器。另请注意,此类将是单例的,因为我不希望有多个线程专门用于从我的线程池中写入。

显然 FileStream 对象在释放之前不会写入磁盘。现在我不希望 FileStream 等待一整天直到它写入磁盘。保存所有序列化对象所需的内存将是巨大的,更不用说应用程序或服务器上的任何崩溃都会导致数据丢失或文件损坏。

现在我的问题

如何让底层流(FileStream 和 StreamWriter)定期写入磁盘而不释放它们?我最初的假设是,一旦 FileSteam 超过其缓冲区大小(默认为 4K),它就会写入磁盘。

更新:已修复答案中提到的不一致之处。

代码

public class EventLogger: IDisposable, ILogger

    private readonly BlockingCollection<List<string>> _queue;
    private readonly Task _consumerTask;
    private FileStream _fs;
    private StreamWriter _sw;
    public EventLogger()
                
        OpenFile();
        _queue = new BlockingCollection<List<string>>(50);
        _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    
    private void OpenFile()
    
        _fs?.Dispose();
        _sw?.Dispose();            
        _logFilePath = $"D:\Log\logDateTime.Now.ToString(yyyyMMdd)System.Diagnostic.Process.GetCurrentProcess().Id.txt";
        _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
        _sw = new StreamWriter(_fs);
    
    public void Dispose()
                
         _queue?.CompleteAdding();
         _consumerTask?.Wait();            
         _sw?.Dispose();
         _fs?.Dispose();
         _queue?.Dispose();            

    
    public void Log(List<string> list)
    
        try
                       
            _queue.TryAdd(list, 100);               

        
        catch (Exception e)
        
            LogError(LogLevel.Error, e);
        
    
    private void Write()
    
        foreach (List<string> items in _queue.GetConsumingEnumerable())
                       
            items.ForEach(item =>
                                
                _sw?.WriteLine(item);                    
            );
        

    

【问题讨论】:

***.com/questions/1059142/…, msdn.microsoft.com/en-us/library/… 我在这里遗漏了什么吗? FileStream 是否没有实现完全执行此操作的刷新方法?或者您是说当您尝试冲洗时会产生这种特殊的蒸汽? @Charleh 你是对的。它完成了这项工作。它清除缓冲区并写入磁盘。问题是,我不想每次写入都清除缓冲区并写入磁盘。我仍然想缓冲高达 4K 的数据,然后“定期”写入磁盘。 @CodeCaster AutoFlush 设置为 true 时,StreamWriter 将缓冲区刷新到其底层流,在我的例子中是 FileStream。但是 FileStream 仍然保留数据而不将其写入磁盘! 为什么不让另一个线程定期检查缓冲区的长度并在缓冲区足够满时刷新?你已经完成了所有困难的线程工作,现在你只需要一个锁来防止在刷新期间写入。当涉及到多线程时,我不是问的最佳人选,但我假设您可以在没有锁的情况下读取缓冲区长度,但我想象文件流不是线程安全的 【参考方案1】:

您的问题存在一些“不一致”。

应用程序托管在 IIS 上并使用 18 个工作进程

.

_logFilePath = $"D:\Log\logDateTime.Now.ToString(yyyyMMdd)System.Diagnostic.Process.GetCurrentProcess().Id.txt";

通过多种方法将序列化对象写入文件

将所有这些放在一起,您似乎遇到了单线程情况,而不是多线程情况。而且由于每个进程都有一个单独的日志,因此不存在争用问题或需要同步。我的意思是,我根本不明白为什么需要BlockingCollection。您可能忘记提及您的 Web 进程中有多个线程。我会在这里做这个假设。

另一个问题是你的代码无法编译

    类名是Logger,但EventLogger 函数看起来像一个构造函数。 一些更不正确的字符串语法等

抛开所有这些,如果您确实遇到争用情况并希望通过多个线程或进程写入同一个日志,那么您的类似乎拥有您需要的大部分内容。我已经修改了你的课程来做更多的事情。主要需要注意的是以下项目

    修复了所有假设的语法错误 添加了一个定时器,它会定期调用刷新。这将需要一个lock 对象,以免中断写操作 在StreamWriter 构造函数中使用了显式缓冲区大小。您应该启发式地确定最适合您的尺寸。此外,您应该从StreamWriter 禁用AutoFlush,这样您的写入就可以命中缓冲区而不是文件,从而提供更好的性能。

以下是修改后的代码

public class EventLogger : IDisposable, ILogger 
    private readonly BlockingCollection<List<string>> _queue;
    private readonly Task _consumerTask;
    private FileStream _fs;
    private StreamWriter _sw;
    private System.Timers.Timer _timer;
    private object streamLock = new object();

    private const int MAX_BUFFER = 16 * 1024;      // 16K
    private const int FLUSH_INTERVAL = 10 * 1000;  // 10 seconds

    public  EventLogger() 
        OpenFile();
        _queue = new BlockingCollection<List<string>>(50);
        _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

    

    void SetupFlushTimer() 
        _timer = new System.Timers.Timer(FLUSH_INTERVAL);
        _timer.AutoReset = true;
        _timer.Elapsed += TimedFlush;
    

    void TimedFlush(Object source, System.Timers.ElapsedEventArgs e) 
        _sw?.Flush();
    

    private void OpenFile() 
        _fs?.Dispose();
        _sw?.Dispose();
        var _logFilePath = $"D:\\Log\\logDateTime.Now.ToString("yyyyMMdd")System.Diagnostics.Process.GetCurrentProcess().Id.txt";
        _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
        _sw = new StreamWriter(_fs, Encoding.Default, MAX_BUFFER); // TODO: use the correct encoding here
        _sw.AutoFlush = false;
    

    public void Dispose() 
        _timer.Elapsed -= TimedFlush;
        _timer.Dispose();

        _queue?.CompleteAdding();
        _consumerTask?.Wait();
        _sw?.Dispose();
        _fs?.Dispose();
        _queue?.Dispose();

    
    public void Log(List<string> list) 
        try 
            _queue.TryAdd(list, 100);

         catch (Exception e) 
            LogError(LogLevel.Error, e);
        
    

    private void Write() 
        foreach (List<string> items in _queue.GetConsumingEnumerable()) 
            lock (streamLock) 
                items.ForEach(item => 
                    _sw?.WriteLine(item);
                );
            
        

    

编辑: 有 4 个因素控制着这种机制的性能,了解它们的关系很重要。下面的例子希望能清楚地说明

我们说

List&lt;string&gt; 的平均大小为 50 字节 呼叫/秒为 10,000 MAX_BUFFER 是 1024 * 1024 字节(1 兆)

您每秒产生 500,000 字节的数据,因此 1 Meg 缓冲区只能容纳 2 秒的数据。即,即使FLUSH_INTERVAL 设置为 10 秒,当缓冲区空间不足时,缓冲区也会每 2 秒(平均)自动刷新一次。

还请记住,盲目地增加MAX_BUFFER 将无济于事,因为由于缓冲区大小较大,实际的刷新操作将花费更长的时间。

要了解的主要内容是,当传入数据速率(到您的 EventLog 类)和传出数据速率(到磁盘)存在差异时,您将需要一个无限大小的缓冲区(假设连续运行的进程) 否则你将不得不放慢你的平均速度。传入率匹配平均。出境率

【讨论】:

感谢维克拉姆的建议!你的假设是正确的:我每个进程有多个线程。我要更新我的问题。至于语法错误,这是简化我的源代码的失败尝试。你是对的! 至于使用定时器的解决方案,我觉得是个好主意。我要试一试。【参考方案2】:

也许我的回答无法解决您的具体问题,但我相信您的场景可能是 memory-mapped files 的一个很好的用例。

持久化文件是与 磁盘上的源文件。当最后一个进程完成工作时 文件,数据保存到磁盘上的源文件中。这些 内存映射文件适用于处理非常大的文件 源文件。

这可能非常有趣,因为您将能够从不同的进程(即 IIS 工作进程)进行日志记录,而不会出现锁定问题。见MemoryMappedFile.OpenExisting方法。

此外,您可以记录到非持久共享内存映射文件,并且使用任务调度程序或 Windows 服务,您可以使用持久内存映射文件将挂起的日志发送到最终目的地。

由于您的多进程/跨进程场景,我看到了使用这种方法的巨大潜力。

方法#2

如果您不想重新发明***,我会选择可靠的消息队列,例如 MSMQ(非常基本,但在您的场景中仍然有用)或 RabbitMQ。将日志放入持久队列中,后台进程可能会使用这些日志队列将日志写入文件系统。

通过这种方式,您可以创建一次、一天两次或任何时候创建日志文件,并且在记录系统内的操作时不会与文件系统绑定。

【讨论】:

感谢 Matias 的建议。我喜欢第一个解决方案。我不知道内存映射文件,我会试一试。虽然我的文件不应该那么大。第二种方法也很有趣。我们已经在使用一个类似于队列的系统,称为 Hangfire。不太一样,但使用它可能不是一个坏主意。【参考方案3】:

使用 FileStream.Flush() 方法 - 您可以在每次调用 .Write 后执行此操作。它将清除流的缓冲区并导致将任何缓冲的数据写入文件。

https://msdn.microsoft.com/en-us/library/2bw4h516(v=vs.110).aspx

【讨论】:

写入后刷新可能会出现性能问题,尤其是在每秒写入 10,000 个条目的情况下。这就是流不直接写入磁盘,而是先将数据缓冲到内存的确切原因;这是一个缓慢的操作。更好的选择可能是使用计时器每分钟定期刷新缓冲区,或者在每 x 个整数后刷新。 正是@BradleyUffner! @BradleyUffner 这是一种可能性,但也许不是。对存储介质的实际写入取决于系统上安装的硬件和驱动程序。硬件本身可能有一个巨大的缓冲区。 OP 的目标是从应用程序 中获取数据。将其移交给操作系统后,可能仍需要调整性能,无论是否将刷新放在计时器上。

以上是关于如何定期将 c# FileStream 刷新到磁盘?的主要内容,如果未能解决你的问题,请参考以下文章

C# 流总结

C# 计算输入和输出 FileStream 的 MD5

C# 流总

如何在 C# 中清空/刷新 Windows READ 磁盘缓存?

C#流总结(文件流内存流网络流BufferedStreamStreamReader/StreamWriterTextReader/TextWriter)

C#流总结(文件流内存流网络流BufferedStreamStreamReader/StreamWriterTextReader/TextWriter)