自定义一个简单的Task调度器任务循环调度器TaskScheduler
Posted youliCC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义一个简单的Task调度器任务循环调度器TaskScheduler相关的知识,希望对你有一定的参考价值。
前言:
自从接触异步(async await Task)操作后,始终都不明白,这个Task调度的问题。
接触Quartz.net已经很久了,只知道它实现了一套Task调度的方法,自己跟着Quartz.net源代码写了遍,调试后我算是明白了Task调度的一部分事( )。
春风来不远,只在屋东头。
理解Task运行,请参考大佬文章 https://www.cnblogs.com/artech/p/task_scheduling.html ,推荐大佬的书。
直到我看Quartz.net源代码中的任务调度 “QueuedTaskScheduler”,我才搞明白了,如何写一个简单的任务调度器,或者说线程如何执行代码,才不会造成死循环,CPU吃满等问题,下面代码有的直接从quartz.net copy过来的。
BlockingCollection类
微软文档 https://learn.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview
个人博客,中文解释通俗易懂 https://www.cnblogs.com/gl1573/p/14595985.html
BlockingCollection 提供一个很重要的“阻塞”功能。
TaskScheduler类
TaskScheduler 直译过来:表示一个对象,该对象处理将任务排队到线程上的低级工作。
该类为抽象类,其真正意义在于“对Task任务的编排”
基于TaskScheduler类实现自定义的“Task队列调度器”
源代码,我的仓库 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler
1.定义一个存储Task的队列容器,使用BlockingCollection容器来添加Task,为什么使用BlockingCollection,后面会解释
/// <summary>The collection of tasks to be executed on our custom threads.</summary>
private readonly BlockingCollection<Task> _blockingTaskQueue;
2.定义CancellationTokenSource变量,用于释放。通常就是调用 CancellationToken.ThrowIfCancellationRequested() ,抛出一个 “OperationCanceledException”的异常,使正在执行的Task任务停止。
3.创建Thread数组,用于存储创建出的Thread
/// <summary>The threads used by the scheduler to process work.</summary>
private readonly Thread[] _threads;
4.自定义一个类QueuedTaskScheduler,继承 “TaskScheduler”,“IDisposable”
public class QueuedTaskScheduler: System.Threading.Tasks.TaskScheduler, IDisposable
实现构造函数
public QueuedTaskScheduler(int threadCount)
_threadCount = threadCount;
_blockingTaskQueue = new BlockingCollection<Task>();
// create threads
_threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
_threads[i] = new Thread(ThreadBasedDispatchLoop)
Priority = ThreadPriority.Normal,
IsBackground = true,
Name = $"threadName (i)"
;
// start
foreach (var thread in _threads)
thread.Start();
在构造函数中创建,并启动“Thread”,构造函数接收一个“线程数量的参数”,控制开启的线程数。
Thread中实现的委托为“ThreadBasedDispatchLoop”,其表达意思是“基于循环的调度”。
5.重点来了,具体看下“ThreadBasedDispatchLoop”方法的实现
ThreadBasedDispatchLoop实现
/// <summary>The dispatch loop run by all threads in this scheduler.</summary>
private void ThreadBasedDispatchLoop()
_taskProcessingThread.Value = true;
try
// If a thread abort occurs, we\'ll try to reset it and continue running.
while (true)
try
// For each task queued to the scheduler, try to execute it.
foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
TryExecuteTask(task);
catch (ThreadAbortException)
// If we received a thread abort, and that thread abort was due to shutting down
// or unloading, let it pass through. Otherwise, reset the abort so we can
// continue processing work items.
if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
#pragma warning disable SYSLIB0006
Thread.ResetAbort();
#pragma warning restore SYSLIB0006
catch (OperationCanceledException)
// If the scheduler is disposed, the cancellation token will be set and
// we\'ll receive an OperationCanceledException. That OCE should not crash the process.
finally
_taskProcessingThread.Value = false;
在外层套一层try catch捕获 CancellationTokenSource 变量,取消操作(CancellationTokenSource.Cancel())产生的异常,并且忽略该异常。
其中使用while(true),无限循环执行,?????奇了怪了,为什么以前写代码时,while(true)写了,会直接把CPU吃满,程序搞奔溃呢????
关键点就在于
当_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行时,如果_blockingTaskQueue容器中没有元素时,执行就会被“阻塞”,这种阻塞不会造成或者造成很小的资源浪费。
当_blockingTaskQueue有值时,阻塞就会停止,_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行,返回一个Task对象,然后开始执行 TryExecuteTask(task) ,执行Task。
6.继承 “TaskScheduler”后需要实现的几个方法
GetScheduledTasks
protected override IEnumerable<Task>? GetScheduledTasks()
return _blockingTaskQueue.ToList();
GetScheduledTasks 返回需要被调度的 Tasks
QueueTask
protected override void QueueTask(Task task)
// QueuedTaskScheduler 释放时,禁止向队列中添加Task
if (_disposeCancellation.IsCancellationRequested)
throw new ObjectDisposedException(GetType().Name);
_blockingTaskQueue.Add(task);
QueueTask 将排队等候的Task加入到 “_blockingTaskQueue”队列变量中
TryExecuteTaskInline
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
// If we\'re already running tasks on this threads, enable inlining
return _taskProcessingThread.Value && TryExecuteTask(task);
意思是,参数task是否在此线程上运行,请查看ThreadBasedDispatchLoop方法。
“ThreadLocal<bool>” 该类型变量声明生命周期跟随 “构造函数”中启动的线程,且每一个线程单独一个变量,值存储在线程上。
自此自定义“Task调度器”完成。
启动,运行QueuedTaskScheduler
1. 创建 QueuedTaskScheduler ,其中用于执行Task的线程数为 1
2.创建 Task ,并将其加入到指定的 Task调度器中。
3.调试一下
A. 创建 QueuedTaskScheduler ,创建 线程 “Thread” ,并启动线程
B. 调试过程
当 _blockingTaskQueue没有Task时,执行到 _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token) 就会阻塞。
自此 自定义TaskScheduler完成。
我的源代码 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler
本文来自博客园,作者:youliCC,转载请注明原文链接:https://www.cnblogs.com/youlicc/p/17403429.html
力扣621. 任务调度器 模拟法
暴力模拟 哈希计数存储 用哈希表初始化容器 容器存储自定义结构体 每轮循环结构体自定义排序 按每类任务的剩余次数降序排序 优先选择剩余次数最大的任务
struct zifu //保存一种任务种类,num存储剩余任务次数
char fu;
int num;
;
static bool cmp(const zifu &a, const zifu &b) //自定义排序方式,以任务次数降序排序
return a.num > b.num;
int leastInterval(vector<char>& tasks, int n)
if (n == 0) return tasks.size();
unordered_map<char, int>mymap;//哈希存储字符和任务次数
vector<zifu>mv;//用容器来对剩余任务自定义排序
for (int i = 0; i < tasks.size(); i++) //存入哈希表
mymap[tasks[i]]++;
for (auto it = mymap.begin(); it != mymap.end(); it++) //通过哈希表建立容器
zifu x;
x.fu = it->first;
x.num = it->second;
mv.push_back(x);
int kind = mymap.size(), time = 0;//kind表示剩余任务种类数
//每轮循环要优先选择剩余次数最多的任务种类,这样才能达到最少时间,这是关键,这样才能最大化利用时间空隙
while (kind > 0)
sort(mv.begin(), mv.end(), cmp);//每轮让剩余次数最多的任务排在前面
if (kind > n) //剩余种类个数大于间隔时间,可以先取n+1种类取一个任务做 ,注意不是取所有种类取一个任务做,而是取最小要求的n+1个,这样可以让time达到最小
//PS:剩余种类个数等于间隔时间是不行的 此时需要等待一个单位时间才能进行下一轮 临界条件注意区分
int i = 0,j=0;//i为本次for循环中剩余次数减为0的种类数
for (auto it = mv.begin(); j<=n;j++) //取n+1个任务来做
(*it).num--;
if ((*it).num == 0)
it = mv.erase(it);
i++;
else it++;
time += (n + 1);
kind -= i;
else //剩余种类个数小于或等于间隔时间
int i = 0;
for (auto it = mv.begin(); it != mv.end();)
(*it).num--;
if ((*it).num == 0)
it = mv.erase(it);
i++;
else it++;
time += (n + 1);
kind -= i;
if (kind == 0) time = time - (n + 1) + i;//如果这是最后一轮,那么time加多了,因为把任务做完就不需要等待了,time += n + 1;多算上了等待时间
return time;
以上是关于自定义一个简单的Task调度器任务循环调度器TaskScheduler的主要内容,如果未能解决你的问题,请参考以下文章