是否总是可以使用 Task 强制一个新线程?

Posted

技术标签:

【中文标题】是否总是可以使用 Task 强制一个新线程?【英文标题】:Is it possible always to force a new thread with Task? 【发布时间】:2012-11-14 06:26:01 【问题描述】:

每次调用Task.Factory.StartNew 时,我都会尝试创建一个新线程。问题是如何在不抛出异常的情况下运行下面的代码:

static void Main(string[] args)

    int firstThreadId = 0;

    Task.Factory.StartNew(() => firstThreadId = Thread.CurrentThread.ManagedThreadId);

    for (int i = 0; i < 100; i++)
    
        Task.Factory.StartNew(() =>
        
            while (true)
            
                Thread.Sleep(1000);
                if (firstThreadId == Thread.CurrentThread.ManagedThreadId)
                    throw new Exception("The first thread is reused.");
            
        );
    
    Console.Read();

编辑:新代码如果您注释第一个 for 语句,则没有问题。但是如果你有它,哇,消息“线程重用”被写入控制台。你能解释一下吗,因为我真的很困惑。

static void Main(string[] args)

    ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();

    for (int i = 0; i < 10; i++)
    
        Task.Factory.StartNew(() =>
        
            Task.Factory.StartNew(() =>
            
                startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId,
                    Thread.CurrentThread.ManagedThreadId, (a, b) => b);
            , TaskCreationOptions.LongRunning);

            for (int j = 0; j < 100; j++)
            
                Task.Factory.StartNew(() =>
                
                    while (true)
                    
                        Thread.Sleep(10);
                        if (startedThreads.ContainsKey(
                            Thread.CurrentThread.ManagedThreadId))
                                Console.WriteLine("Thread reused");
                    
                , TaskCreationOptions.LongRunning);
            
        );
    

    Console.Read();

【问题讨论】:

Task 库的全部意义在于您不必直接使用线程。如果您确实需要直接使用线程,请不要使用 Tasks。 您必须使用 TPL 吗?如果您的真实代码的结构与示例类似,您可以将对Task.Factory.StartNew(...) 的调用替换为简单的new Thread(...).Start(),并且您将保证实际上为每个“任务”创建了一个线程。 @KeithS:任务比线程更容易处理。 @SLaks 好吧,任务是存在的,因此您不必使用线程。但是,如果您 想要 使用线程,则不需要任务提供的抽象(实际上它会妨碍您),因此我认为 OP 会更容易考虑到他所说的需要,使用线程而不是任务。 @KeithS:不;我认为 OP 想要 Task 更好的 API 和可组合性,但需要每个回调在单独的线程上运行。 【参考方案1】:

如果您在启动任务时指定TaskCreationOptions.LongRunning,则会向调度程序提供一个提示,默认调度程序将其作为指示器来为任务创建一个新线程。

这只是一个提示 - 我不确定我是否会依赖...但我没有看到任何使用默认调度程序的反例。

【讨论】:

10x,它与当前示例的 TaskCreationOptions.LongRunning 一起使用。我将尝试重现即使这样也无济于事的问题。 brb @mynkow:你已经更新了它,但没有真正描述你所看到的。显然出了点问题,但我们不知道是什么... 10x,消息“线程重用”被写入控制台。已更新。【参考方案2】:

除了 Jon Skeet 的回答之外,如果您想保证每次都会创建一个新线程,您可以编写自己的 TaskScheduler 来创建一个新线程。

【讨论】:

【参考方案3】:

您好,谢谢大家的回答。你们都得到了+1。所有建议的解决方案都不适用于我的情况。问题是当你休眠一个线程时,它会在某个时间点被重用。上面的人建议:

使用 LongRunning => 如果您有嵌套/子级,这将不起作用 任务 自定义任务调度程序 => 我尝试自己编写并尝试了这个 ThreadPerTaskScheduler 这也不起作用。 使用纯线程 => 仍然失败... 你也可以在Multithreading.Scheduler github查看这个项目

我的解决方案

我不喜欢它,但它有效。基本上我阻塞了线程,所以它不能被重用。下面是扩展方法和一个工作示例。再次感谢您。

https://gist.github.com/4150635

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication

    public static class ThreadExtensions
    
        /// <summary>
        /// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread, int millisecondsTimeout)
        
            new WakeSleepClass(millisecondsTimeout).SleepThread();
        

        /// <summary>
        /// Blocks the current thread so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread)
        
            new WakeSleepClass().SleepThread();
        

        /// <summary>
        /// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread, TimeSpan timeout)
        
            new WakeSleepClass(timeout).SleepThread();
        

        class WakeSleepClass
        
            bool locked = true;
            readonly TimerDisposer timerDisposer = new TimerDisposer();

            public WakeSleepClass(int sleepTime)
            
                var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
                timerDisposer.InternalTimer = timer;
            

            public WakeSleepClass(TimeSpan sleepTime)
            
                var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
                timerDisposer.InternalTimer = timer;
            

            public WakeSleepClass()
            
                var timer = new Timer(WakeThread, timerDisposer, Timeout.Infinite, Timeout.Infinite);
                timerDisposer.InternalTimer = timer;
            

            public void SleepThread()
            
                while (locked)
                    lock (timerDisposer) Monitor.Wait(timerDisposer);
                locked = true;
            

            public void WakeThread(object key)
            
                locked = false;
                lock (key) Monitor.Pulse(key);
                ((TimerDisposer)key).InternalTimer.Dispose();
            

            class TimerDisposer
            
                public Timer InternalTimer  get; set; 
            
        
    

    class Program
    
        private static readonly Queue<CancellationTokenSource> tokenSourceQueue = new Queue<CancellationTokenSource>();
        static void Main(string[] args)
        
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            tokenSourceQueue.Enqueue(tokenSource);

            ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();
            for (int i = 0; i < 10; i++)
            
                Thread.Sleep(1000);
                Task.Factory.StartNew(() =>
                
                    startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b);
                    for (int j = 0; j < 50; j++)
                        Task.Factory.StartNew(() => startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b));

                    for (int j = 0; j < 50; j++)
                    
                        Task.Factory.StartNew(() =>
                        
                            while (!tokenSource.Token.IsCancellationRequested)
                            
                                if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
                                Thread.CurrentThread.Block(10);
                                if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
                            
                        , tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
                        .ContinueWith(task =>
                        
                            WriteExceptions(task.Exception);
                            Console.WriteLine("-----------------------------");
                        , TaskContinuationOptions.OnlyOnFaulted);
                    
                    Thread.CurrentThread.Block();
                , tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
                .ContinueWith(task =>
                
                    WriteExceptions(task.Exception);
                    Console.WriteLine("-----------------------------");
                , TaskContinuationOptions.OnlyOnFaulted);
            

            Console.Read();
        

        private static void WriteExceptions(Exception ex)
        
            Console.WriteLine(ex.Message);
            if (ex.InnerException != null)
                WriteExceptions(ex.InnerException);
        
    

【讨论】:

【参考方案4】:

试试这个:

var taskCompletionSource = new TaskCompletionSource<bool>();
Thread t = new Thread(() =>

    try
    
        Operation();
        taskCompletionSource.TrySetResult(true);
    
    catch (Exception e)
    
        taskCompletionSource.TrySetException(e);
    
);
void Operation()

    // Some work in thread

t.Start();
await taskCompletionSource.Task;

您还可以为 Action、Func 等编写扩展方法。 例如:

public static Task RunInThread(
    this Action action,
    Action<Thread> initThreadAction = null)

    TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();

    Thread thread = new Thread(() =>
    
        try
        
            action();
            taskCompletionSource.TrySetResult(true);
        
        catch (Exception e)
        
            taskCompletionSource.TrySetException(e);
        
    );
    initThreadAction?.Invoke(thread);
    thread.Start();

    return taskCompletionSource.Task;

public static Task<TResult> RunInThread<T1, T2, TResult>(
    this Func<T1, T2, TResult> function,
    T1 param1,
    T2 param2,
    Action<Thread> initThreadAction = null)

    TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>();

    Thread thread = new Thread(() =>
    
        try
        
            TResult result = function(param1, param2);
            taskCompletionSource.TrySetResult(result);
        
        catch (Exception e)
        
            taskCompletionSource.TrySetException(e);
        
    );
    initThreadAction?.Invoke(thread);
    thread.Start();

    return taskCompletionSource.Task;

并像这样使用它:

var result = await some_function.RunInThread(param1, param2).ConfigureAwait(true);

【讨论】:

【参考方案5】:

只需用 new Thread() 启动线程,然后用 Start() 启动它们

static void Main(string[] args)

    ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();

    for (int i = 0; i < 10; i++)
    
        new Thread(() =>
        
            new Thread(() =>
            
                startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b);
            ).Start();

            for (int j = 0; j < 100; j++)
            
                new Thread(() =>
                
                    while (true)
                    
                        Thread.Sleep(10);
                        if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId))
                            Console.WriteLine("Thread reused");
                    
                ).Start();
            
        ).Start();
    

    Console.Read();


任务应该由调度程序管理。 Tasks 的整个想法是运行时将决定何时需要一个新线程。另一方面,如果您确实需要不同的线程,则代码中的其他内容可能是错误的,例如过度依赖 Thread.Sleep() 或线程本地存储。

正如所指出的,您可以创建自己的 TaskScheduler 并使用任务来创建线程,但是为什么要使用 Tasks 开始呢?

【讨论】:

我同意这一点,除非线程中运行的操作是异步任务。【参考方案6】:

这是一个自定义TaskScheduler,它在每个任务的专用线程上执行任务:

public class ThreadPerTask_TaskScheduler : TaskScheduler

    protected override void QueueTask(Task task)
    
        var thread = new Thread(() => TryExecuteTask(task));
        thread.IsBackground = true;
        thread.Start();
    

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    
        return TryExecuteTask(task);
    

    protected override IEnumerable<Task> GetScheduledTasks()  yield break; 

使用示例:

var parallelOptions = new ParallelOptions()

    MaxDegreeOfParallelism = 3,
    TaskScheduler = new ThreadPerTask_TaskScheduler()
;

Parallel.ForEach(Enumerable.Range(1, 10), parallelOptions, item =>

    Console.WriteLine($"DateTime.Now:HH:mm:ss.fff" +
        $" [Thread.CurrentThread.ManagedThreadId]" +
        $" Processing #item" +
        (Thread.CurrentThread.IsBackground ? ", Background" : "") +
        (Thread.CurrentThread.IsThreadPoolThread ? ", ThreadPool" : ""));
    Thread.Sleep(1000); // Simulate CPU-bound work
);

输出:

20:38:56.770 [4] Processing #3, Background
20:38:56.770 [5] Processing #2, Background
20:38:56.770 [1] Processing #1
20:38:57.782 [1] Processing #4
20:38:57.783 [8] Processing #5, Background
20:38:57.783 [7] Processing #6, Background
20:38:58.783 [1] Processing #7
20:38:58.783 [10] Processing #8, Background
20:38:58.787 [9] Processing #9, Background
20:38:59.783 [1] Processing #10

Try it on Fiddle.

这个自定义TaskScheduler 也允许当前线程参与计算。这在上面的示例中通过线程[1] 处理项目#1#4#7#10 进行了演示。如果您不希望发生这种情况,只需将TryExecuteTaskInline 中的代码替换为return false;

另一个例子,以Task.Factory.StartNew 方法为特色。在 100 个不同的线程上启动 100 个任务:

var oneThreadPerTask = new ThreadPerTask_TaskScheduler();
Task[] tasks = Enumerable.Range(1, 100).Select(_ =>

    return Task.Factory.StartNew(() =>
    
        Thread.Sleep(1000); // Simulate long-running work
    , default, TaskCreationOptions.None, oneThreadPerTask);
).ToArray();

在这种情况下,当前线程没有参与工作,因为所有任务都是通过调用它们的 Start 方法在幕后启动的,而不是 RunSynchronously.

【讨论】:

以上是关于是否总是可以使用 Task 强制一个新线程?的主要内容,如果未能解决你的问题,请参考以下文章

降低Task.Factory.StartNew线程的优先级

创建线程总是对性能有好处吗?

TryExecuteTask(task) 是不是总是阻塞?

在当前线程上执行任务

使用 task_group 的英特尔线程构建模块性能不佳(新用户)

如何在c#控制台应用程序中自动(强制)停止当前线程[重复]