ReactiveUI:如何实现定期刷新的ReactiveList
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReactiveUI:如何实现定期刷新的ReactiveList相关的知识,希望对你有一定的参考价值。
我是Reactive Extensions的新手,我还在努力学习。
我正在开发一个带有数据网格的应用程序,该应用程序显示某些正在运行的Windows进程及其内存使用情况应该频繁更新每个进程的内存使用量,即每200毫秒。
要求
- 选中复选框时 数据网格应填充进程,并使用间隔为200毫秒的计时器更新内存使用情况。 监视器(所有应该在后台线程上完成)
- 如果进程已退出,则应将其从源中删除。
- 如果进程启动,则应将其添加到源中
- 一个变更文件
- 取消选中复选框时 应停止所有监视器活动 数据网格已清除
任何帮助将不胜感激!笔记:
- In the past我尝试了几种方法,比如使用ObservableConcurrentDictionary作为资源和定时器来定期更新资源,但我遇到了麻烦(并发,锁定等),所以我想有一个基于Rx / ReactiveUI的解决方案
- 做技术限制我只能使用.NET Framework 4.0,Reactive-core.Net40
更新
视图模型
private ReactiveList<IProcessModel> _processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
public ReactiveList<IProcessModel> Processes { get { return _processes; } }
public MainViewModel(IMonitorService monitorService)
{
this.WhenAnyValue(vm => vm.ShowProcessesIsChecked).Subscribe((b) => DoShowProcesses(b));
}
private void DoShowProcesses(bool checkboxChecked)
{
IDisposable timer;
Processes.Clear();
if (checkboxChecked)
{
//checkbox checked
lock (Processes)
Processes.AddRange(_monitorService.GetProcesses());
timer = Observable.Timer(TimeSpan.FromMilliseconds(200.0))
.Select(x =>
{
lock (Processes)
{
foreach (var process in Processes) //throws the 'Collection was modified; enumeration operation may not execute.'
process.UpdateMemory();
return Processes.Where(p => p.ProcessObject.HasExited).ToList();
}
}).
ObserveOnDispatcher()
.Subscribe(processesExited =>
{
if (processesExited.Count() > 0)
{
lock (Processes)
Processes.RemoveAll(processesExited); //remove all processes that have exited
}
});
}
else
{
if (timer != null)
timer.Dispose();
}
}
我开始了new thread
原版的
视图模型
public class MainViewModel : ReactiveObject
{
public ReactiveList<IProcessModel> Processes { get; private set; }
IMonitorService _monitorService;
public MainViewModel(IMonitorService monitorService)
{
_monitorService = monitorService;
Processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
.Where(value => value == true) //checkbox checked
.ObserveOn(Scheduler.Default) //raise notifications on thread-pool thread to keep UI responsive
.Select((isChecked) =>
{
return monitorService.GetProcesses();
})
.ObserveOn(SynchronizationContext.Current)
.Subscribe(processes => {
Processes.AddRange(processes); }
);
//start the MonitorService with MonitorService.Start(Processes)
//start a timer with an interval of 200ms --> at interval
//- do UpdateMemory() foreach IProcessModel in Processes
//- if ProcessObject.HasExited --> remove it from the collection source
;
this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
.Where(value => value == false) //checkbox unchecked
.Subscribe((isChecked) =>
{
monitorService.Stop(); //this stops monitoring for starting processes and clears the Processes
});
}
private bool _showProcessesIsChecked;
public bool ShowProcessesIsChecked
{
get { return _showProcessesIsChecked; }
set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
}
}
模型
public class ProcessModel : ProcessModelBase, IProcessModel
{
public ProcessModel(Process process)
{
ProcessObject = process;
}
public void UpdateMemory()
{
try
{
if (!ProcessObject.HasExited)
{
long mem = ProcessObject.PagedMemorySize64;
ProcessObject.Refresh();
if (mem != ProcessObject.PagedMemorySize64)
OnPropertyChanged(nameof(ProcessObject));
}
}
catch (Exception)
{
//log it
}
}
}
服务
public class MonitorService : IMonitorService
{
ManagementEventWatcher managementEventWatcher;
ReactiveList<IProcessModel> _processes;
public List<IProcessModel> GetProcesses()
{
List<IProcessModel> processes = new List<IProcessModel>();
foreach (var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome")))
processes.Add(new ProcessModel(process));
return processes;
}
/// <summary>
/// Starts the manager. Monitor a starting process and changes in log file
/// </summary>
/// <param name="processes"></param>
public void Start(ReactiveList<IProcessModel> processes)
{
_processes = processes;
var qStart = "SELECT * FROM Win32_ProcessStartTrace WHERE ProcessName like 'chrome'";
managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart));
managementEventWatcher.EventArrived += new EventArrivedEventHandler(OnProcessStarted);
try
{
managementEventWatcher.Start();
}
catch (Exception)
{
//log it
}
Task.Factory.StartNew(() => MonitorLogFile());
}
public void Stop()
{
if (managementEventWatcher != null)
managementEventWatcher.Stop();
if (_processes != null)
_processes.Clear();
}
private void MonitorLogFile()
{
//this code monitors a log file for changes. It is possible that the IsChecked property of a ProcessModel object is set in the Processes collection
}
private void OnProcessStarted(object sender, EventArrivedEventArgs e)
{
try
{
Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value));
_processes.Add(new ProcessModel(process));
}
catch (ArgumentException)
{
//log it
}
catch (InvalidOperationException)
{
//log it
}
}
}
XAML
<CheckBox Content='Show Processes' IsChecked='{Binding ShowProcessesIsChecked}' />
<DataGrid ItemsSource="{Binding Processes}">
<DataGrid.Resources>
<DataGridTemplateColumn Header='Process'
x:Key='dgProcessName'
IsReadOnly='True'
x:Shared='False'>
<DataGridTemplateColumn.CellTemplate>
<DataTemplate>
<StackPanel Orientation='Horizontal' VerticalAlignment='Center'>
<CheckBox IsChecked='{Binding IsChecked, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}' HorizontalAlignment='Stretch' VerticalAlignment='Stretch'> </CheckBox>
<TextBlock Text='{Binding ProcessObject.ProcessName}' />
</StackPanel>
</DataTemplate>
</DataGridTemplateColumn.CellTemplate>
</DataGridTemplateColumn>
<DataGridTextColumn Header="PID"
Binding="{Binding ProcessObject.Id}"
IsReadOnly='True'
x:Key='dgPID'
x:Shared='False' />
<DataGridTextColumn Header="Commit Size"
Binding='{Binding ProcessObject.PagedMemorySize64}'
IsReadOnly='True'
x:Key='dgCommitSize'
x:Shared='False' />
</DataGrid.Resources>
</DataGrid>
你基本上想要使用这种模式:
IObservable<bool> checkBoxChecked = /* your checkbox observable here */
IObservable<long> timer = Observable.Interval(TimeSpan.FromMilliseconds(200.0));
IObservable<long> query =
checkBoxChecked
.Select(x => x ? timer : Observable.Never<long>().StartWith(-1L))
.Switch();
IDisposable subscription =
query
.Subscribe(n =>
{
if (n == -1L)
{
// Clear UI
}
else
{
// Update UI
}
});
这将根据复选框的值在运行和未运行之间切换。
你需要确保你在UI线程上观察,但除了一些小的调整之外,这应该可以正常工作。
无论如何我想练习我的Rx技能,所以我继续创建了一个WPF项目并给了它一个机会。我得到了它的工作,所以我将分享我如何去做。
- 从MonitorService中删除进程列表。这将有助于隔离列表修改源,使我们的调试生活更轻松。它还将MonitorService的职责范围缩小到提供初始列表和发布更改。
- 我们已经“被动”了,所以我们也可以使用FromEventPattern将EventArrived事件变成一个可观察的事件。然后我们可以将这些被触发的事件发送到ProcessModel并将它们推送给订户。
- 我将ManagementEventWatcher创建移动到构造函数,因此每次选中复选框时都不必重新创建它。现在,Start / Stop方法现在只是_managementEventWatcher版本的包装器。
.
transform
MainViewModel
- 为了避免错误,例如'收集被修改;枚举操作可能无法执行'一个简单的解决方案是
public class MonitorService { ManagementEventWatcher _managementEventWatcher; public IObservable<ProcessModel> NewProcessObservable { get; } public MonitorService() { var qStart = "SELECT * FROM Win32_ProcessStartTrace where ProcessName='chrome.exe'"; _managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart)); var eventArrivedObservable = Observable.FromEventPattern<EventArrivedEventHandler, EventArrivedEventArgs>( x => _managementEventWatcher.EventArrived += x, x => _managementEventWatcher.EventArrived -= x); NewProcessObservable = eventArrivedObservable .Select(x => GetProcessModel(x.EventArgs)) .Where(x => x != null); } public List<ProcessModel> GetProcesses() { List<ProcessModel> processes = new List<ProcessModel>(); foreach(var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome"))) processes.Add(new ProcessModel(process)); return processes; } public void Start() { try { _managementEventWatcher.Start(); } catch(Exception ex) { Console.WriteLine(ex.Message); } } public void Stop() { if(_managementEventWatcher != null) _managementEventWatcher.Stop(); } private ProcessModel GetProcessModel(EventArrivedEventArgs e) { ProcessModel model = null; try { Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value)); model = new ProcessModel(process); } catch(ArgumentException) { //log it } catch(InvalidOperationException) { //log it } return model; } }
。 - 紧接在每个订阅调用之前,使用.ObserveOnDispatcher(),以便onNext调用将在UI线程上执行。很酷的部分是我们现在修改列表的唯一地方是订阅内部。所以,我们不必使用锁和所有这些。
- 我将逻辑划分为3个不同的订阅:启动/停止监视,更新内存使用并删除已退出的进程(使用@Enigmativity的建议模式),并将新启动的进程添加到我们的被动列表中。希望这可以很容易地遵循逻辑。
.
iterate backward using a for loop
更多信息如果这是您应用程序中唯一的页面/窗口,那么这就足够了。如果没有,你还有一些工作要做,以避免内存泄漏。在后一种情况下,我建议谷歌搜索public class MainViewModel : ReactiveObject
{
public ReactiveList<ProcessModel> Processes { get; private set; }
MonitorService _monitorService;
public MainViewModel(MonitorService monitorService)
{
_monitorService = monitorService;
Processes = new ReactiveList<ProcessModel>() { ChangeTrackingEnabled = true };
RxApp.SupportsRangeNotifications = false;
IObservable<bool> checkboxObservable = this.WhenAnyValue(vm => vm.ShowProcessesIsChecked);
IObservable<long> intervalObservable = Observable.Interval(TimeSpan.FromMilliseconds(200.0));
// Start/stop the monitoring.
checkboxObservable
// Skip the default unchecked state.
.Skip(1)
.ObserveOnDispatcher()
.Subscribe(
isChecked =>
{
if(isChecked)
{
Processes.AddRange(monitorService.GetProcesses());
monitorService.Start();
}
else
{
Processes.Clear();
monitorService.Stop();
}
});
// Update the memory usage and remove processes that have exited.
checkboxObservable
.Select(isChecked => isChecked ? intervalObservable : Observable.Never<long>())
// Switch disposes of the previous internal observable
// (either intervalObservable or Never) and "switches" to the new one.
.Switch()
.ObserveOnDispatcher()
.Subscribe(
_ =>
{
// Loop backwards in a normal for-loop to avoid the modification errors.
for(int i = Processes.Count - 1; i >= 0; --i)
{
if(Processes[i].ProcessObject.HasExited)
{
Processes.RemoveAt(i);
}
else
{
Processes[i].UpdateMemory();
}
}
});
// Add newly started processes to our reactive list.
monitorService.NewProcessObservable
.Where(_ => ShowProcessesIsChecked)
.ObserveOnDispatcher()
.Subscribe(process => Processes.Add(process));
}
private bool _showProcessesIsChecked;
public bool ShowProcessesIsChecked
{
get { return _showProcessesIsChecked; }
set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
}
}
(甚至可能在那里扔wpf)。您将找到许多可以使用和学习的示例。
以上是关于ReactiveUI:如何实现定期刷新的ReactiveList的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming:如何定期刷新缓存的 RDD?
如何在 Avalonia.ReactiveUI 中使用 Autofac 作为 DI 容器?
Xamarin.Forms + ReactiveUI导航,IRoutableViewModel删除/隐藏包含UrlPathSegment内容的标题栏