IO读写操作的TPL Dataflow实现中的内存问题

Posted

技术标签:

【中文标题】IO读写操作的TPL Dataflow实现中的内存问题【英文标题】:Memory issue in TPL Dataflow implementation of IO read write operation 【发布时间】:2016-03-29 10:01:54 【问题描述】:

我尝试使用文件IO操作来实现读写操作,并将这些操作封装到TransformBlock中,以使这些操作线程安全,而不是使用锁定机制。

但问题是,当我尝试并行写入 5 个文件时,内存会出现异常,并且在使用此实现时会阻塞 UI 线程。实现是在 Windows Phone 项目中完成的。请提出此实施中的问题。

文件 IO 操作

public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow 
    = new ExecutionDataflowBlockOptions

    TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
    BoundedCapacity = 1
;

public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow 
    = new ExecutionDataflowBlockOptions

    TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
    BoundedCapacity = 1
;

public static async Task<T> LoadAsync<T>(string fileName)

    T result = default(T);

    var transBlock = new TransformBlock<string, T>
       (async fName =>
       
           return await LoadData<T>(fName);
       , concurrentExecutionDataFlow);

    transBlock.Post(fileName);

    result = await transBlock.ReceiveAsync();

    return result;


public static async Task SaveAsync<T>(T obj, string fileName)

    var transBlock = new TransformBlock<Tuple<T, string>, Task>
       (async tupleData =>
       
          await SaveData(tupleData.Item1, tupleData.Item2);
       , exclusiveExecutionDataFlow);

    transBlock.Post(new Tuple<T, string>(obj, fileName));

    await transBlock.ReceiveAsync();

MainPage.xaml.cs 用法

private static string data = "vjdsskjfhkjsdhvnvndjfhjvkhdfjkgd"
private static string fileName = string.Empty;
private List<string> DataLstSample = new List<string>();
private ObservableCollection<string> TestResults = new ObservableCollection<string>();
private static string data1 = "hjhkjhkhkjhjkhkhkjhkjhkhjkhjkh";
List<Task> allTsk = new List<Task>();
private Random rand = new Random();
private string  fileNameRand

    get
    
        return rand.Next(100).ToString();
    


public MainPage()

    InitializeComponent();

    for (int i = 0; i < 5; i ++)
    
        DataLstSample.Add((i % 2) == 0 ? data : data1);
    



private void Button_Click(object sender, RoutedEventArgs e)

    AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual();


public async void AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()

    TstRst.Text = "InProgress..";
    allTsk.Clear();

    foreach(var data in DataLstSample)
    
        var fName = fileNameRand;

        var t = Task.Run(async () =>
        
            await AppIsolatedStore.SaveAsync<string>(data, fName);
        );

        TestResults.Add(string.Format("Writing file name: 0, data: 1", fName, data));
        allTsk.Add(t);
    

    await Task.WhenAll(allTsk);

    TstRst.Text = "Completed..";

异步保存和加载数据

        /// <summary>
        /// Load object from file
        /// </summary>
        private static async Task<T> LoadData<T>(string fileName)
        

            T result = default(T);

            try
            
                if (!string.IsNullOrWhiteSpace(fileName))
                
                    using (var file = new IsolatedStorageFileStream(fileName, FileMode.OpenOrCreate, _isolatedStore))
                    
                        var data = await _file.ReadTextAsync(file);

                        if (!string.IsNullOrWhiteSpace(data))
                        
                            result = JsonConvert.DeserializeObject<T>(data);
                        
                    
                
            
            catch (Exception ex)
            
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: LoadAsync : An error occured while loading data : 0", ex.Message);
            
            finally
            

            

            return result;
        


        /// <summary>
        /// Save object from file
        /// </summary>
        private static async Task SaveData<T>(T obj, string fileName)
        
            try
            
                if (obj != null && !string.IsNullOrWhiteSpace(fileName))
                
                    //Serialize object with JSON or XML serializer
                    string storageString = JsonConvert.SerializeObject(obj);

                    if (!string.IsNullOrWhiteSpace(storageString))
                    
                        //Write content to file
                        await _file.WriteTextAsync(new IsolatedStorageFileStream(fileName, FileMode.Create, _isolatedStore), storageString);
                    
                
            
            catch (Exception ex)
            
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: SaveAsync : An error occured while saving the data : 0", ex.Message);
            
            finally
            
            
        

编辑:

它出现内存异常的原因是我获取的数据字符串太大的一个原因。字符串为链接:http://1drv.ms/1QWSAsc

但第二个问题是,如果我也添加小数据,那么它会阻塞 UI 线程。代码是否在 UI 上执行任何任务?

【问题讨论】:

【参考方案1】:

不,您使用使用默认线程池的并发对来执行任务,并且您使用Run 方法实例化任务,所以问题不在这里。但是您这里的代码有两个主要威胁:

var transBlock = new TransformBlock<string, T>
   (async fName =>
   
       // process file here
   , concurrentExecutionDataFlow);

你真的不应该每次都创建transBlockTPL Dataflow 的主要思想是您创建块一次,然后使用它们。所以你应该重构你的应用程序以减少你正在实例化的块的数量,否则这不是TPL Dataflow应该使用的情况。

您的代码中的另一个威胁是您明确阻止了线程!

// Right here
await Task.WhenAll(allTsk);
TstRst.Text = "Completed..";

从同步事件处理程序的async void 方法中为任务调用await 会阻塞线程,默认情况下是it captures the synchronization context。首先,async void should be avoided。其次,如果你是异步的,你should be async all the way,所以事件处理程序也应该是异步的。第三,您可以使用 continuation for your task 来更新您的 UI 或 use current synchronization context。

所以,你的代码应该是这样的:

// store the sync context in the field of your form
SynchronizationContext syncContext = SynchronizationContext.Current;

// avoid the async void :)
public async Task AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()

// make event handler async - this is the only exception for the async void use rule from above
private async void Button_Click(object sender, RoutedEventArgs e)

// asynchronically wait the result without capturing the context
await Task.WhenAll(allTsk).ContinueWith(
  t => 
    // you can move out this logic to main method
    syncContext.Post(new SendOrPostCallback(o =>
        
            TstRst.Text = "Completed..";
        ));
  
);

【讨论】:

我正在探索在不使用锁定机制的情况下使 IO 操作成为线程安全的方法。为了避免锁定的副作用。正如您所说,我应该减少创建块的数量或使用差异方法。您能否提出一种更好的方法或一种我可以探索更多的新方法。 @BalrajSingh TPL 数据流仍然在内部使用块。手动lock 语句更易读,效率更高 在最后一个例子中,ConfigureAwait 是错误的——这不会编译。此外,不应使用ContinueWith。如果操作使用 TPL 数据流,更惯用的解决方案是最终的 ActionBlock 和 UI TaskScheduler @StephenCleary 谢谢你的补充!我忘记了护腕。【参考方案2】:

您是否尝试过使用 ExecutionDataflowBlockOptions 上的 BoundedCapacity 参数? Introduction to TPL 提到了区块容量:

[...] 边界在数据流网络中很有用,可以避免内存无限 生长。出于可靠性原因,这可能非常重要,如果有 生产者最终可能更快地生成数据的可能性 比消费者可以处理它...

我建议尝试使用此选项来限制已处理项目的排队,看看它是否有助于解决您的内存问题

【讨论】:

我已将 BoundingCapacity 设置为 1,但问题仍然存在。

以上是关于IO读写操作的TPL Dataflow实现中的内存问题的主要内容,如果未能解决你的问题,请参考以下文章

如何在 TPL/Dataflow 中发出笛卡尔积?

TPL Dataflow 管道中的图像刷新问题

如何在简单的 TPL DataFlow 管道中优化性能?

TPL-Dataflow 是不是适用于高并发应用程序?

TPL Dataflow 如何与“全局”数据同步

TPL Dataflow BufferBlock 线程安全吗?