在Windows服务中以低于正常优先级停止Parallel.ForEach
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Windows服务中以低于正常优先级停止Parallel.ForEach相关的知识,希望对你有一定的参考价值。
我的Windows服务中有一个Parallel.ForEach
代码。如果ParallelOptions.MaxDegreeOfParallelism
设置为-1,我将使用我的大部分CPU。然而,停止服务持续半分钟。应该接收服务应该停止的信号的某些内部控制器线程会超出处理器时间。我将进程优先级设置为低于正常值,但这可能是不相关的信息。
即使所有线程都忙,我还能做些什么来缩短停止服务的时间?
我正在试图暂时降低线程池中线程的优先级,因为我没有任何异步代码,但Internet说这是一个坏主意,所以在这里要求“正确”的方式。
在所有情况下,线程(OS和.NET)在OnStart
和OnStop
之间都是不同的。此外,如果停止时间非常长,那么最终将调用OnStop
的OS线程是一个新线程,而不是在日志中更早显示。
要构建此代码,请创建新的Windows服务项目,从设计器添加ProjectInstaller类,将Account更改为LocalService,然后使用InstallUtil安装一次。确保LocalService可以写入C: Temp。
public partial class Service1 : ServiceBase
{
private ManualResetEvent stopEvent = new ManualResetEvent(false);
private Task mainTask;
private StreamWriter writer = File.AppendText(@"C:TempLog.txt");
public Service1()
{
InitializeComponent();
writer.AutoFlush = true;
}
protected override void OnStart(string[] args)
{
Log("--------------");
Log("OnStart");
mainTask = Task.Run(new Action(Run));
}
protected override void OnStop()
{
Log("OnStop");
stopEvent.Set();
mainTask.Wait();
Log("--------------");
}
private void Log(string line)
{
writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
}
private void Run()
{
try
{
using (var sha = SHA256.Create())
{
var parallelOptions = new ParallelOptions();
parallelOptions.MaxDegreeOfParallelism = -1;
Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
parallelOptions, (fileName, parallelLoopState) =>
{
if (stopEvent.WaitOne(0))
{
Log("Stop requested");
parallelLoopState.Stop();
return;
}
try
{
var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));
}
catch (Exception ex)
{
Log(String.Format("file={0}, exception={1}", fileName, ex.Message));
}
});
}
}
catch (Exception ex)
{
Log(String.Format("exception={0}", ex.Message));
}
}
}
这是一个有效的代码。它立即停止。请注意,主要想法来自:SylF。
但是我无法清楚地解释它为什么会发生...更新(在你的评论之后):你找到了原因并且很好地解释了为什么你有这种行为。谢谢!我很高兴知道。
尽管作业是在低优先级线程中完成的,但您不应该注意到CPU几乎没有工作的机器上的任何额外延迟。
对不起,我混淆了你的代码示例以实现一些测试。但主要的想法是改变调度程序(似乎不推荐)。但这是我发现的唯一方式。
码:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackOverflowQuestionWindowsService1
{
public partial class Service1 : ServiceBase
{
private ManualResetEvent stopEvent = new ManualResetEvent(false);
private Task mainTask;
private StreamWriter writer = File.CreateText(@"C:TempLog.txt"); //TAKE CARE - I do not append anymore ********
private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private int count = 0;
public Service1()
{
InitializeComponent();
writer.AutoFlush = true;
}
protected override void OnStart(string[] args)
{
Log("--------------");
Log("OnStart");
Task.Run(()=>Run());
}
protected override void OnStop()
{
Log("OnStop with actual thread count: " + Process.GetCurrentProcess().Threads.Count);
cancellationTokenSource.Cancel();
}
private void Log(string line)
{
writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
}
private void Run()
{
Stopwatch stopWatchTotal = new Stopwatch();
stopWatchTotal.Start();
try
{
using (var sha = SHA256.Create())
{
var parallelOptions = new ParallelOptions();
parallelOptions.MaxDegreeOfParallelism = -1;
parallelOptions.CancellationToken = cancellationTokenSource.Token;
parallelOptions.TaskScheduler = new PriorityScheduler(ThreadPriority.Lowest);
Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
parallelOptions, (fileName, parallelLoopState) =>
{
// Thread.CurrentThread.Priority = ThreadPriority.Lowest;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
Interlocked.Increment(ref count);
if (parallelOptions.CancellationToken.IsCancellationRequested)
{
Log(String.Format($"{count}"));
return;
}
try
{
var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
stopWatch.Stop();
Log(FormatTicks(stopWatch.ElapsedTicks));
Log(String.Format($"{count}, {FormatTicks(stopWatch.ElapsedTicks)}, file={fileName}, sillyhash={Convert.ToBase64String(hash)}"));
}
catch (Exception ex)
{
Log(String.Format($"{count} file={fileName}, exception={ex.Message}"));
}
});
}
}
catch (Exception ex)
{
Log(String.Format("exception={0}", ex.Message));
}
stopWatchTotal.Stop();
Log(FormatTicks(stopWatchTotal.ElapsedTicks));
writer.Close();
Process.GetCurrentProcess().Kill();
}
private string FormatTicks(long ticks)
{
return new TimeSpan(ticks).ToString();
}
}
}
优先计划程序:(感谢Roman Starkov:来自StackOverflow的Bnaya Eshet的Microsoft)
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackOverflowQuestionWindowsService1
{
public class PriorityScheduler : TaskScheduler
{
public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);
public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);
public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);
private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
private Thread[] _threads;
private ThreadPriority _priority;
private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);
public PriorityScheduler(ThreadPriority priority)
{
_priority = priority;
}
public override int MaximumConcurrencyLevel
{
get { return _maximumConcurrencyLevel; }
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
if (_threads == null)
{
_threads = new Thread[_maximumConcurrencyLevel];
for (int i = 0; i < _threads.Length; i++)
{
int local = i;
_threads[i] = new Thread(() =>
{
foreach (Task t in _tasks.GetConsumingEnumerable())
base.TryExecuteTask(t);
});
_threads[i].Name = string.Format("PriorityScheduler: ", i);
_threads[i].Priority = _priority;
_threads[i].IsBackground = true;
_threads[i].Start();
}
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false; // we might not want to execute task that should schedule as high or low priority inline
}
}
}
此代码将在一两秒内停止服务,而已经计算的线程将在完成其实际工作后结束。正如您在服务中看到的那样,OnStop方法立即接收信号。但是,TaskManager显示只有在消费线程全部完成后,与服务关联的进程才会停止。
这使用了一个单独的线程正在填充的字符串(路径)的BlockingCollection。并且有许多具有低优先级的线程将消耗字符串。
public partial class Service1 : ServiceBase
{
private StreamWriter writer = File.AppendText(@"C: empLog.txt");
const int nbTreads = 30;
BlockingCollection<string> dataItems;
bool stopCompute = false;
List<Thread> threads = new List<Thread>();
Thread threadProd;
private object aLock = new object();
public Service1()
{
InitializeComponent();
dataItems = new BlockingCollection<string>(nbTreads);
writer.AutoFlush = true;
}
protected override void OnStart(string[] args)
{
Log("--------------");
Log("OnStart");
threadProd = new Thread(new ThreadStart(ProduireNomFichier));
threadProd.Start();
Thread.Sleep(1000); // fill the collection a little
for (int i = 0; i < nbTreads; i++)
{
Thread threadRun = ne以上是关于在Windows服务中以低于正常优先级停止Parallel.ForEach的主要内容,如果未能解决你的问题,请参考以下文章