ReactiveUI:如何实现定期刷新的ReactiveList

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReactiveUI:如何实现定期刷新的ReactiveList相关的知识,希望对你有一定的参考价值。

我是Reactive Extensions的新手,我还在努力学习。

我正在开发一个带有数据网格的应用程序,该应用程序显示某些正在运行的Windows进程及其内存使用情况应该频繁更新每个进程的内存使用量,即每200毫秒。

要求

  • 选中复选框时 数据网格应填充进程,并使用间隔为200毫秒的计时器更新内存使用情况。 监视器(所有应该在后台线程上完成)

- 如果进程已退出,则应将其从源中删除。

- 如果进程启动,则应将其添加到源中

- 一个变更文件

  • 取消选中复选框时 应停止所有监视器活动 数据网格已清除

任何帮助将不胜感激!笔记:

  • In the past我尝试了几种方法,比如使用ObservableConcurrentDictionary作为资源和定时器来定期更新资源,但我遇到了麻烦(并发,锁定等),所以我想有一个基于Rx / ReactiveUI的解决方案
  • 做技术限制我只能使用.NET Framework 4.0,Reactive-core.Net40

更新

视图模型

private ReactiveList<IProcessModel> _processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
public ReactiveList<IProcessModel> Processes { get { return _processes; } }

public MainViewModel(IMonitorService monitorService)
{
   this.WhenAnyValue(vm => vm.ShowProcessesIsChecked).Subscribe((b) => DoShowProcesses(b));
}


private void DoShowProcesses(bool checkboxChecked)
{
    IDisposable timer;
    Processes.Clear();
    if (checkboxChecked)
    {
        //checkbox checked
        lock (Processes)
            Processes.AddRange(_monitorService.GetProcesses());
        timer = Observable.Timer(TimeSpan.FromMilliseconds(200.0))
            .Select(x =>
        {
            lock (Processes)
            {
                foreach (var process in Processes) //throws the 'Collection was modified; enumeration operation may not execute.'
                    process.UpdateMemory(); 

                return Processes.Where(p => p.ProcessObject.HasExited).ToList();
            }
        }).
        ObserveOnDispatcher()
        .Subscribe(processesExited =>
        {
            if (processesExited.Count() > 0)
            {
                lock (Processes)
                    Processes.RemoveAll(processesExited); //remove all processes that have exited
            }

        });
    }
    else
    {
        if (timer != null)
            timer.Dispose();
    }
}

我开始了new thread

原版的

视图模型

public class MainViewModel : ReactiveObject
{
    public ReactiveList<IProcessModel> Processes { get; private set; }
    IMonitorService _monitorService;

    public MainViewModel(IMonitorService monitorService)
    {
        _monitorService = monitorService;

        Processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == true) //checkbox checked
            .ObserveOn(Scheduler.Default) //raise notifications on thread-pool thread to keep UI responsive
            .Select((isChecked) =>
            {
                return monitorService.GetProcesses();
            })
            .ObserveOn(SynchronizationContext.Current)
            .Subscribe(processes => {
                Processes.AddRange(processes); }
            );
        //start the MonitorService with MonitorService.Start(Processes)
        //start a timer with an interval of 200ms --> at interval
        //- do UpdateMemory() foreach IProcessModel in Processes
        //- if ProcessObject.HasExited --> remove it from the collection source
        ;
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == false) //checkbox unchecked
            .Subscribe((isChecked) =>
            {
                monitorService.Stop(); //this stops monitoring for starting processes and clears the Processes
            });
    }

    private bool _showProcessesIsChecked;

    public bool ShowProcessesIsChecked
    {
        get { return _showProcessesIsChecked; }
        set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
    }
}

模型

public class ProcessModel : ProcessModelBase, IProcessModel
{

    public ProcessModel(Process process)
    {
        ProcessObject = process;
    }      

    public void UpdateMemory()
    {
        try
        {
            if (!ProcessObject.HasExited)
            {
                long mem = ProcessObject.PagedMemorySize64;
                ProcessObject.Refresh();
                if (mem != ProcessObject.PagedMemorySize64)
                    OnPropertyChanged(nameof(ProcessObject));
            }
        }
        catch (Exception)
        {
            //log it
        }
    }
}

服务

public class MonitorService : IMonitorService
{
    ManagementEventWatcher managementEventWatcher;
    ReactiveList<IProcessModel> _processes;

    public List<IProcessModel> GetProcesses()
    {
        List<IProcessModel> processes = new List<IProcessModel>();

        foreach (var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome")))
            processes.Add(new ProcessModel(process));
        return processes;
    }

    /// <summary>
    /// Starts the manager. Monitor a starting process and changes in log file
    /// </summary>
    /// <param name="processes"></param>
    public void Start(ReactiveList<IProcessModel> processes)
    {
        _processes = processes;

        var qStart = "SELECT * FROM Win32_ProcessStartTrace WHERE ProcessName like 'chrome'";
        managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart));
        managementEventWatcher.EventArrived += new EventArrivedEventHandler(OnProcessStarted);
        try
        {
            managementEventWatcher.Start();
        }
        catch (Exception)
        {
            //log it
        }
        Task.Factory.StartNew(() => MonitorLogFile());
    }


    public void Stop()
    {
        if (managementEventWatcher != null)
            managementEventWatcher.Stop();
        if (_processes != null)
            _processes.Clear();
    }

    private void MonitorLogFile()
    {
        //this code monitors a log file for changes. It is possible that the IsChecked property of a ProcessModel object is set in the Processes collection
    }


    private void OnProcessStarted(object sender, EventArrivedEventArgs e)
    {

        try
        {
            Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value));
            _processes.Add(new ProcessModel(process));
        }
        catch (ArgumentException)
        {
            //log it
        }
        catch (InvalidOperationException)
        {
            //log it
        }

    }
}

XAML

<CheckBox Content='Show Processes' IsChecked='{Binding ShowProcessesIsChecked}' />
<DataGrid  ItemsSource="{Binding Processes}">
    <DataGrid.Resources>
      <DataGridTemplateColumn Header='Process'
                              x:Key='dgProcessName'
                              IsReadOnly='True'
                              x:Shared='False'>
        <DataGridTemplateColumn.CellTemplate>
            <DataTemplate>
                <StackPanel Orientation='Horizontal' VerticalAlignment='Center'>
                    <CheckBox IsChecked='{Binding IsChecked, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}' HorizontalAlignment='Stretch' VerticalAlignment='Stretch'> </CheckBox>
                    <TextBlock Text='{Binding ProcessObject.ProcessName}' />
                </StackPanel>
            </DataTemplate>
        </DataGridTemplateColumn.CellTemplate>
          </DataGridTemplateColumn>
          <DataGridTextColumn Header="PID"
                              Binding="{Binding ProcessObject.Id}"
                              IsReadOnly='True'
                              x:Key='dgPID'
                              x:Shared='False' />
          <DataGridTextColumn Header="Commit Size"
                              Binding='{Binding ProcessObject.PagedMemorySize64}'
                              IsReadOnly='True'
                              x:Key='dgCommitSize'
                              x:Shared='False' />
    </DataGrid.Resources>
</DataGrid>
答案

你基本上想要使用这种模式:

IObservable<bool> checkBoxChecked = /* your checkbox observable here */
IObservable<long> timer = Observable.Interval(TimeSpan.FromMilliseconds(200.0));

IObservable<long> query =
    checkBoxChecked
        .Select(x => x ? timer : Observable.Never<long>().StartWith(-1L))
        .Switch();

IDisposable subscription =
    query
        .Subscribe(n =>
        {
            if (n == -1L)
            {
                // Clear UI
            }
            else
            {
                // Update UI
            }
        });

这将根据复选框的值在运行和未运行之间切换。

你需要确保你在UI线程上观察,但除了一些小的调整之外,这应该可以正常工作。

另一答案

无论如何我想练习我的Rx技能,所以我继续创建了一个WPF项目并给了它一个机会。我得到了它的工作,所以我将分享我如何去做。

  • 从MonitorService中删除进程列表。这将有助于隔离列表修改源,使我们的调试生活更轻松。它还将MonitorService的职责范围缩小到提供初始列表和发布更改。
  • 我们已经“被动”了,所以我们也可以使用FromEventPattern将EventArrived事件变成一个可观察的事件。然后我们可以将这些被触发的事件发送到ProcessModel并将它们推送给订户。
  • 我将ManagementEventWatcher创建移动到构造函数,因此每次选中复选框时都不必重新创建它。现在,Start / Stop方法现在只是_managementEventWatcher版本的包装器。

.

transform

MainViewModel

  • 为了避免错误,例如'收集被修改;枚举操作可能无法执行'一个简单的解决方案是public class MonitorService { ManagementEventWatcher _managementEventWatcher; public IObservable<ProcessModel> NewProcessObservable { get; } public MonitorService() { var qStart = "SELECT * FROM Win32_ProcessStartTrace where ProcessName='chrome.exe'"; _managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart)); var eventArrivedObservable = Observable.FromEventPattern<EventArrivedEventHandler, EventArrivedEventArgs>( x => _managementEventWatcher.EventArrived += x, x => _managementEventWatcher.EventArrived -= x); NewProcessObservable = eventArrivedObservable .Select(x => GetProcessModel(x.EventArgs)) .Where(x => x != null); } public List<ProcessModel> GetProcesses() { List<ProcessModel> processes = new List<ProcessModel>(); foreach(var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome"))) processes.Add(new ProcessModel(process)); return processes; } public void Start() { try { _managementEventWatcher.Start(); } catch(Exception ex) { Console.WriteLine(ex.Message); } } public void Stop() { if(_managementEventWatcher != null) _managementEventWatcher.Stop(); } private ProcessModel GetProcessModel(EventArrivedEventArgs e) { ProcessModel model = null; try { Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value)); model = new ProcessModel(process); } catch(ArgumentException) { //log it } catch(InvalidOperationException) { //log it } return model; } }
  • 紧接在每个订阅调用之前,使用.ObserveOnDispatcher(),以便onNext调用将在UI线程上执行。很酷的部分是我们现在修改列表的唯一地方是订阅内部。所以,我们不必使用锁和所有这些。
  • 我将逻辑划分为3个不同的订阅:启动/停止监视,更新内存使用并删除已退出的进程(使用@Enigmativity的建议模式),并将新启动的进程添加到我们的被动列表中。希望这可以很容易地遵循逻辑。

.

iterate backward using a for loop

更多信息如果这是您应用程序中唯一的页面/窗口,那么这就足够了。如果没有,你还有一些工作要做,以避免内存泄漏。在后一种情况下,我建议谷歌搜索public class MainViewModel : ReactiveObject { public ReactiveList<ProcessModel> Processes { get; private set; } MonitorService _monitorService; public MainViewModel(MonitorService monitorService) { _monitorService = monitorService; Processes = new ReactiveList<ProcessModel>() { ChangeTrackingEnabled = true }; RxApp.SupportsRangeNotifications = false; IObservable<bool> checkboxObservable = this.WhenAnyValue(vm => vm.ShowProcessesIsChecked); IObservable<long> intervalObservable = Observable.Interval(TimeSpan.FromMilliseconds(200.0)); // Start/stop the monitoring. checkboxObservable // Skip the default unchecked state. .Skip(1) .ObserveOnDispatcher() .Subscribe( isChecked => { if(isChecked) { Processes.AddRange(monitorService.GetProcesses()); monitorService.Start(); } else { Processes.Clear(); monitorService.Stop(); } }); // Update the memory usage and remove processes that have exited. checkboxObservable .Select(isChecked => isChecked ? intervalObservable : Observable.Never<long>()) // Switch disposes of the previous internal observable // (either intervalObservable or Never) and "switches" to the new one. .Switch() .ObserveOnDispatcher() .Subscribe( _ => { // Loop backwards in a normal for-loop to avoid the modification errors. for(int i = Processes.Count - 1; i >= 0; --i) { if(Processes[i].ProcessObject.HasExited) { Processes.RemoveAt(i); } else { Processes[i].UpdateMemory(); } } }); // Add newly started processes to our reactive list. monitorService.NewProcessObservable .Where(_ => ShowProcessesIsChecked) .ObserveOnDispatcher() .Subscribe(process => Processes.Add(process)); } private bool _showProcessesIsChecked; public bool ShowProcessesIsChecked { get { return _showProcessesIsChecked; } set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); } } } (甚至可能在那里扔wpf)。您将找到许多可以使用和学习的示例。

以上是关于ReactiveUI:如何实现定期刷新的ReactiveList的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming:如何定期刷新缓存的 RDD?

用于实现ReactUI.XamForms主细节页面的模式

如何定期将 c# FileStream 刷新到磁盘?

如何在 Avalonia.ReactiveUI 中使用 Autofac 作为 DI 容器?

Xamarin.Forms + ReactiveUI导航,IRoutableViewModel删除/隐藏包含UrlPathSegment内容的标题栏

如何将 RShiny reactiveFileReader 与 reactiveUI 和不存在的文件一起使用?