如何让Task在非线程池线程中执行?

Posted tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何让Task在非线程池线程中执行?相关的知识,希望对你有一定的参考价值。

Task承载的操作需要被调度才能被执行,由于.NET默认采用基于线程池的调度器,所以Task默认在线程池线程中执行。但是有的操作并不适合使用线程池,比如我们在一个ASP.NET Core应用中承载了一些需要长时间执行的后台操作,由于线程池被用来处理HTTP请求,如果这些后台操作也使用线程池来调度,就会造成相互影响。在这种情况下,使用独立的一个或者多个线程来执行这些后台操作可能是一个更好的选择。

Task承载的操作需要被调度才能被执行,由于.NET默认采用基于线程池的调度器,所以Task默认在线程池线程中执行。但是有的操作并不适合使用线程池,比如我们在一个ASP.NET Core应用中承载了一些需要长时间执行的后台操作,由于线程池被用来处理HTTP请求,如果这些后台操作也使用线程池来调度,就会造成相互影响。在这种情况下,使用独立的一个或者多个线程来执行这些后台操作可能是一个更好的选择。

一、基于线程池的调度
二、TaskCreationOptions.LongRunning
三、换成异步操作呢?
四、换种写法呢?
五、调用Wait方法
六、自定义TaskScheduler
七、独立线程池

一、基于线程池的调度

我们通过如下这个简单的程序来验证默认基于线程池的Task调度。我们调用Task类型的静态属性Factory返回一个TaskFactory对象,并调用其StartNew方法启动一个Task对象,这个Task指向的Run方法会在一个循环中调用Do方法。Do方法使用自选等待的方式模拟一段耗时2秒的操作,并在控制台输出当前线程的IsThreadPoolThread属性确定是否是线程池线程。

Task.Factory.StartNew(Run);
Console.Read();

void Run()

    while (true)
    
        Do();
    


void  Do()

    var end = DateTime.UtcNow.AddSeconds(2);
    SpinWait.SpinUntil(() => DateTimeOffset.UtcNow > end);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[DateTimeOffset.Now]Is thread pool thread: isThreadPoolThread");

通过如下所示的输出结果,我们得到了答案:利用TaskFactory创建的Task在默认情况下确实是通过线程池的形式被调度的。

二、TaskCreationOptions.LongRunning

很明显,上述Run方法是一个需要永久执行的LongRunning操作,并不适合使用线程池来执行,实际上TaskFactory在设计的时候就考虑到了这一点,我们利用它创建一个Task的时候可以指定对应的TaskCreationOptions选项,其中一个选项就是LongRuning。我们通过如下的方式修改了上面这段程序,在调用StartNew方法时指定了这个选项。

Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning);
Console.Read();

void Run()

    while (true)
    
        Do();
    


void  Do()

    var end = DateTime.UtcNow.AddSeconds(2);
    SpinWait.SpinUntil(() => DateTimeOffset.UtcNow > end);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[DateTimeOffset.Now]Is thread pool thread: isThreadPoolThread");

再次执行我们的程序就会通过如下的输出结果看到Do方法将不会在线程池线程中执行了。

三、换成异步操作呢?

由于LongRunning操作经常会涉及IO操作,所以我们执行方法经常会写成异步的形式。如下所示的代码中,我们将Do方法替换成DoAsync,将2秒的自旋等待替换成Task.Delay。由于DoAsync写成了异步的形式,Run也换成对应的RunAsync。

Task.Factory.StartNew(RunAsync, TaskCreationOptions.LongRunning);
Console.Read();

async Task RunAsync()

    while (true)
    
       await DoAsync();
    


async Task  DoAsync()

    await Task.Delay(2000);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[DateTimeOffset.Now]Is thread pool thread: isThreadPoolThread");

再次启动程序后,我们发现又切换成了线程池调度了。为什么会这样呢?其实很好理解,由于原来返回void的Run方法被替换成了返回Task的RunAsync,传入StartNew方法表示执行操作的委托类型从Action切换成了Func<Task>,虽然我们指定了LongRunning选项,但是StartNew方法只是采用这种模式执行Func<Task>这个委托对象而已,而这个委托在遇到await的时候就返回了。终于返回的Task对象,还是按照默认的方式进行调度执行。

四、换种写法呢?

有人说,上面我们使用的是一个方法来表示作为参数的委托对象,如果我们按照如下的方式使用基于async/await的Lambda表达式呢?实际上这样的Lambda表达式就是Func<Task>的另一种编写方式而已。

Task.Factory.StartNew(async () =>  while (true) await DoAsync();, TaskCreationOptions.LongRunning);
Console.Read();


async Task  DoAsync()

    await Task.Delay(2000);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[DateTimeOffset.Now]Is thread pool thread: isThreadPoolThread");

五、调用Wait方法

其实这个问题很好解决,按照如下的方式将DoAsync方法换成同步形式的Do,将基于await的等待替换成针对Wait方法的调用就可以了。我想当你接触Task的时候,就有很多人不断提醒你,谨慎使用Wait方法,因为它会阻塞当前线程。实际上对于我们的硬要用场景,调用Wait方法才是正确的选择,因为我们的初衷就是使用一个独立的线程以独占的方式来执行所需的操作。

Task.Factory.StartNew(() =>  while (true) Do(); , TaskCreationOptions.LongRunning);
Console.Read();

void  Do()

    Task.Delay(2000).Wait();
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[DateTimeOffset.Now]Is thread pool thread: isThreadPoolThread");

六、自定义TaskScheduler

既然针对线程池的使用是“Task调度”导致的,那么我们自然可以通过重写TaskScheduler的方式来解决这个问题。如下这个自定义的DedicatedThreadTaskScheduler 会采用独立的线程来执行被调度的Task,线程的数量可以参数来指定。

internal sealed class DedicatedThreadTaskScheduler : TaskScheduler

    private readonly BlockingCollection<Task> _tasks = new();
    private readonly Thread[] _threads;
    protected override IEnumerable<Task>? GetScheduledTasks() => _tasks;
    protected override void QueueTask(Task task) => _tasks.Add(task);
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;
    public DedicatedThreadTaskScheduler(int threadCount)
    
        _threads = new Thread[threadCount];
        for (int index = 0; index < threadCount; index++)
        
            _threads[index] = new Thread(_ =>
            
                while (true)
                
                    TryExecuteTask(_tasks.Take());
                
            );
        
        Array.ForEach(_threads, it => it.Start());
    

我们演示实例中Run/Do方法再次还原成如下所示的纯异步模式的RunAsync/DoAsync,并在调用StartNew方法的时候创建一个DedicatedThreadTaskScheduler对象作为最后一个参数。

Task.Factory.StartNew(RunAsync, CancellationToken.None, TaskCreationOptions.LongRunning, new DedicatedThreadTaskScheduler(1));
Console.Read();

async Task RunAsync()

    while (true)
    
        await DoAsync();
    


async Task DoAsync()

    await Task.Delay(2000);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[DateTimeOffset.Now]Is thread pool thread: isThreadPoolThread");

由于创建的Task将会使用指定的DedicatedThreadTaskScheduler 对象来调度,DoAsync方法自然就不会在线程池线程中执行了。

七、独立线程池

.NET提供的线程池是一个全局共享的线程池,而我们定义的DedicatedThreadTaskScheduler相当于创建了一个独立的线程池,对象池的效果可以通过如下这个简单的程序展现出来。

Task.Factory.StartNew(()=> Task.WhenAll( Enumerable.Range(1,6).Select(it=>DoAsync(it))),
        CancellationToken.None,
        TaskCreationOptions.None,
        new DedicatedThreadTaskScheduler(2));

async Task DoAsync(int index)

    await Task.Yield();
    Console.WriteLine($"[DateTimeOffset.Now.ToString("hh:MM:ss")]Task index is executed in thread Environment.CurrentManagedThreadId");
    var endTime = DateTime.UtcNow.AddSeconds(4);
    SpinWait.SpinUntil(() => DateTime.UtcNow > endTime);
    await Task.Delay(1000);

Console.ReadLine();

如上面的代码片段所示,异步方法DoAsync利用自旋等待模拟了一段耗时4秒的操作,通过调用Task.Delay方法模拟了一段耗时1秒的IO操作。我们在其中输出了任务开始执行的时间和当前线程ID。在调用的StartNew方法中,我们调用这个DoAsync方法创建了6个Task,这些Task交给创建的DedicatedThreadTaskScheduler进行调度。我们为这个DedicatedThreadTaskScheduler指定的线程数量为2。从如下所示的输出结果可以看出,6个操作确实在两个线程中执行的。

Java并发编程实践读书笔记 线程池的使用

Executor与Task的耦合性

1,除非线程池很非常大,否则一个Task不要依赖同一个线程服务中的另外一个Task,因为这样容易造成死锁;

2,线程的执行是并行的,所以在设计Task的时候要考虑到线程安全问题。如果你认为只会在单任务线程的Executor中运行的话,从设计上讲这就已经耦合了。

3,长时间的任务有可能会影响到其他任务的执行效率,可以让其他线程在等待的时候限定一下等待时间。不要无限制地等待下去。

 

确定线程池的大小

给出如下定义:

要使CPU达到期望的使用率,线程池的大小应设置为:

约等于Ncpu+1。

定制线程池

通过Executor来创建的线程池其实是预先给我们打造好的线程池,例如:

如果不能满足需求,可以自己量身定做:

    public ThreadPoolExecutor(int corePoolSize, //基本大小,没有执行任务时线程池的大小
                              int maximumPoolSize,  //最大任务数量
                              long keepAliveTime, //最大空闲时间,超时后会被回收
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
             //... 
    }

任务队列:缓存任务

前面有说到,使用线程池的好处是可以限制线程的数量,因此保证服务器不会被超量的线程给拖垮。当并发请求增加,我们不再是盲目地创建线程了。而是简单地创建一个任务缓存到处理队列中了。那么问题来了,当并发请求继续激增,服务线程无法消化任务,造成大量的任务堆积到队列中。这也是会消耗服务器资源的。

我们需要让服务器实在忙不过来的时候,扔掉一些请求。以保证自己不会奔溃。该放手的时候一定要放手,不是每个人都能随随便便造一个航天飞船发上天啊。服务器真的应付不过来的时候最重要的是要保护好自己。

创建线程池的时候,可以自己指定任务队列:

无界队列:就是创建队列的时候不设置大小;

有界队列:通上设置一个大小;

同步交换队列:SynchronousQueue,严格意义上说,它并不是真的队列,它只是一种数据交换的工具。任务来之后,这个队列直接把任务交给线程池中的一个线程。如果没有可用线程,则创建一个线程。如果不能创建了会拒绝这个任务。同步交换队列的核心就是它不缓存任务,要么线程池足够大,要么就直接拒绝。用于线程池可以无限增长或不会出现任务激增的场景。

可以用PriorityBlockingQueue来实现任务的排序。

饱和策略:任务队列中都存不了了怎么办

 线程池提供了多种处理办法:

Abort:终止,对外抛RejectedExecutionException;

Discard:悄悄抛弃,注意是悄悄地,客户端根本不会知道自己提交的任务根本就没运行;

DiscardOldest:抛弃最老的,如果使用的是优先队列,被抛弃的会是优先级最高的那个(所以DiscardOldest不要和优先队列搭配使用)。

Caller-Runs:用客户端线程来直接运行任务代码。就是哪个线程在往线程池扔任务,那么就用这个线程来直接跑这个任务。这么做的好处是,让客户端分担线程池的负担,更主要的是让客户端先忙一会儿别的,从而暂停往线程池提交任务的动作。以期望线程池能最终缓过来。客户端都来帮着线程池做事了,那它自己的事肯定也就没人做了。如果客户端代码是一个Web请求处理程序,那么它将不再accpet新的请求,这些新的请求会被堵在TCP的缓存中,如果系统一直没有缓过劲来,再继续发展下去,就会把“过载”的信息弥漫到TCP的调用端了。整个这套设计的潜台词是“我们已经尽力了”。系统不会硬着陆直接挂掉,如果请求压力放缓,系统还是可以扛过来的。

线程工厂:定制线程

可以自己实现线程工厂来构建线程变量,从而定义更多需要的信息。

扩展ThreadPoolExecutor

ThreadPoolExecutor有一些生命周期的方法:beforeExecute、afterExecute和terminated。可以重写这些方法,实现统计和监控的功能。

 

递归算法的并行化

什么样的情况可以并行?

比如for循环去做一些事情,而这些事情是相互独立的。那么这些事情其实可以并行来做的(for循环是串行执行)。

这给我们的编程提供了新的思路,很多写了很多遍、平淡无奇的代码其实有更有趣的实现方式。

 

以上是关于如何让Task在非线程池线程中执行?的主要内容,如果未能解决你的问题,请参考以下文章

菜鸟之旅——学习线程(Task)

C# 线程线程池Task概念+代码实践

Java线程池ThreadPoolExecuter:execute()原理

C#的 Task,Thread,ThreadPool 之间有啥异同

深入浅出Java并发编程指南「源码分析篇」透析ThreadPoolExecutor线程池运作机制和源码体系

整理:如何估算吞吐量以及线程池大小