是否总是可以使用 Task 强制一个新线程?
Posted
技术标签:
【中文标题】是否总是可以使用 Task 强制一个新线程?【英文标题】:Is it possible always to force a new thread with Task? 【发布时间】:2012-11-14 06:26:01 【问题描述】:每次调用Task.Factory.StartNew
时,我都会尝试创建一个新线程。问题是如何在不抛出异常的情况下运行下面的代码:
static void Main(string[] args)
int firstThreadId = 0;
Task.Factory.StartNew(() => firstThreadId = Thread.CurrentThread.ManagedThreadId);
for (int i = 0; i < 100; i++)
Task.Factory.StartNew(() =>
while (true)
Thread.Sleep(1000);
if (firstThreadId == Thread.CurrentThread.ManagedThreadId)
throw new Exception("The first thread is reused.");
);
Console.Read();
编辑:新代码如果您注释第一个 for
语句,则没有问题。但是如果你有它,哇,消息“线程重用”被写入控制台。你能解释一下吗,因为我真的很困惑。
static void Main(string[] args)
ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();
for (int i = 0; i < 10; i++)
Task.Factory.StartNew(() =>
Task.Factory.StartNew(() =>
startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId,
Thread.CurrentThread.ManagedThreadId, (a, b) => b);
, TaskCreationOptions.LongRunning);
for (int j = 0; j < 100; j++)
Task.Factory.StartNew(() =>
while (true)
Thread.Sleep(10);
if (startedThreads.ContainsKey(
Thread.CurrentThread.ManagedThreadId))
Console.WriteLine("Thread reused");
, TaskCreationOptions.LongRunning);
);
Console.Read();
【问题讨论】:
Task 库的全部意义在于您不必直接使用线程。如果您确实需要直接使用线程,请不要使用 Tasks。 您必须使用 TPL 吗?如果您的真实代码的结构与示例类似,您可以将对Task.Factory.StartNew(...)
的调用替换为简单的new Thread(...).Start()
,并且您将保证实际上为每个“任务”创建了一个线程。
@KeithS:任务比线程更容易处理。
@SLaks 好吧,任务是存在的,因此您不必使用线程。但是,如果您 想要 使用线程,则不需要任务提供的抽象(实际上它会妨碍您),因此我认为 OP 会更容易考虑到他所说的需要,使用线程而不是任务。
@KeithS:不;我认为 OP 想要 Task
更好的 API 和可组合性,但需要每个回调在单独的线程上运行。
【参考方案1】:
如果您在启动任务时指定TaskCreationOptions.LongRunning
,则会向调度程序提供一个提示,默认调度程序将其作为指示器来为任务创建一个新线程。
这只是一个提示 - 我不确定我是否会依赖...但我没有看到任何使用默认调度程序的反例。
【讨论】:
10x,它与当前示例的 TaskCreationOptions.LongRunning 一起使用。我将尝试重现即使这样也无济于事的问题。 brb @mynkow:你已经更新了它,但没有真正描述你所看到的。显然出了点问题,但我们不知道是什么... 10x,消息“线程重用”被写入控制台。已更新。【参考方案2】:除了 Jon Skeet 的回答之外,如果您想保证每次都会创建一个新线程,您可以编写自己的 TaskScheduler
来创建一个新线程。
【讨论】:
【参考方案3】:您好,谢谢大家的回答。你们都得到了+1。所有建议的解决方案都不适用于我的情况。问题是当你休眠一个线程时,它会在某个时间点被重用。上面的人建议:
使用 LongRunning => 如果您有嵌套/子级,这将不起作用 任务 自定义任务调度程序 => 我尝试自己编写并尝试了这个 ThreadPerTaskScheduler 这也不起作用。 使用纯线程 => 仍然失败... 你也可以在Multithreading.Scheduler github查看这个项目我的解决方案
我不喜欢它,但它有效。基本上我阻塞了线程,所以它不能被重用。下面是扩展方法和一个工作示例。再次感谢您。
https://gist.github.com/4150635
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
public static class ThreadExtensions
/// <summary>
/// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
/// </summary>
public static void Block(this Thread thread, int millisecondsTimeout)
new WakeSleepClass(millisecondsTimeout).SleepThread();
/// <summary>
/// Blocks the current thread so that the thread cannot be reused by the threadpool.
/// </summary>
public static void Block(this Thread thread)
new WakeSleepClass().SleepThread();
/// <summary>
/// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
/// </summary>
public static void Block(this Thread thread, TimeSpan timeout)
new WakeSleepClass(timeout).SleepThread();
class WakeSleepClass
bool locked = true;
readonly TimerDisposer timerDisposer = new TimerDisposer();
public WakeSleepClass(int sleepTime)
var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
timerDisposer.InternalTimer = timer;
public WakeSleepClass(TimeSpan sleepTime)
var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
timerDisposer.InternalTimer = timer;
public WakeSleepClass()
var timer = new Timer(WakeThread, timerDisposer, Timeout.Infinite, Timeout.Infinite);
timerDisposer.InternalTimer = timer;
public void SleepThread()
while (locked)
lock (timerDisposer) Monitor.Wait(timerDisposer);
locked = true;
public void WakeThread(object key)
locked = false;
lock (key) Monitor.Pulse(key);
((TimerDisposer)key).InternalTimer.Dispose();
class TimerDisposer
public Timer InternalTimer get; set;
class Program
private static readonly Queue<CancellationTokenSource> tokenSourceQueue = new Queue<CancellationTokenSource>();
static void Main(string[] args)
CancellationTokenSource tokenSource = new CancellationTokenSource();
tokenSourceQueue.Enqueue(tokenSource);
ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();
for (int i = 0; i < 10; i++)
Thread.Sleep(1000);
Task.Factory.StartNew(() =>
startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b);
for (int j = 0; j < 50; j++)
Task.Factory.StartNew(() => startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b));
for (int j = 0; j < 50; j++)
Task.Factory.StartNew(() =>
while (!tokenSource.Token.IsCancellationRequested)
if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
Thread.CurrentThread.Block(10);
if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
, tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
.ContinueWith(task =>
WriteExceptions(task.Exception);
Console.WriteLine("-----------------------------");
, TaskContinuationOptions.OnlyOnFaulted);
Thread.CurrentThread.Block();
, tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
.ContinueWith(task =>
WriteExceptions(task.Exception);
Console.WriteLine("-----------------------------");
, TaskContinuationOptions.OnlyOnFaulted);
Console.Read();
private static void WriteExceptions(Exception ex)
Console.WriteLine(ex.Message);
if (ex.InnerException != null)
WriteExceptions(ex.InnerException);
【讨论】:
【参考方案4】:试试这个:
var taskCompletionSource = new TaskCompletionSource<bool>();
Thread t = new Thread(() =>
try
Operation();
taskCompletionSource.TrySetResult(true);
catch (Exception e)
taskCompletionSource.TrySetException(e);
);
void Operation()
// Some work in thread
t.Start();
await taskCompletionSource.Task;
您还可以为 Action、Func 等编写扩展方法。 例如:
public static Task RunInThread(
this Action action,
Action<Thread> initThreadAction = null)
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
Thread thread = new Thread(() =>
try
action();
taskCompletionSource.TrySetResult(true);
catch (Exception e)
taskCompletionSource.TrySetException(e);
);
initThreadAction?.Invoke(thread);
thread.Start();
return taskCompletionSource.Task;
或
public static Task<TResult> RunInThread<T1, T2, TResult>(
this Func<T1, T2, TResult> function,
T1 param1,
T2 param2,
Action<Thread> initThreadAction = null)
TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>();
Thread thread = new Thread(() =>
try
TResult result = function(param1, param2);
taskCompletionSource.TrySetResult(result);
catch (Exception e)
taskCompletionSource.TrySetException(e);
);
initThreadAction?.Invoke(thread);
thread.Start();
return taskCompletionSource.Task;
并像这样使用它:
var result = await some_function.RunInThread(param1, param2).ConfigureAwait(true);
【讨论】:
【参考方案5】:只需用 new Thread() 启动线程,然后用 Start() 启动它们
static void Main(string[] args)
ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();
for (int i = 0; i < 10; i++)
new Thread(() =>
new Thread(() =>
startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b);
).Start();
for (int j = 0; j < 100; j++)
new Thread(() =>
while (true)
Thread.Sleep(10);
if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId))
Console.WriteLine("Thread reused");
).Start();
).Start();
Console.Read();
任务应该由调度程序管理。 Tasks 的整个想法是运行时将决定何时需要一个新线程。另一方面,如果您确实需要不同的线程,则代码中的其他内容可能是错误的,例如过度依赖 Thread.Sleep() 或线程本地存储。
正如所指出的,您可以创建自己的 TaskScheduler 并使用任务来创建线程,但是为什么要使用 Tasks 开始呢?
【讨论】:
我同意这一点,除非线程中运行的操作是异步任务。【参考方案6】:这是一个自定义TaskScheduler
,它在每个任务的专用线程上执行任务:
public class ThreadPerTask_TaskScheduler : TaskScheduler
protected override void QueueTask(Task task)
var thread = new Thread(() => TryExecuteTask(task));
thread.IsBackground = true;
thread.Start();
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
return TryExecuteTask(task);
protected override IEnumerable<Task> GetScheduledTasks() yield break;
使用示例:
var parallelOptions = new ParallelOptions()
MaxDegreeOfParallelism = 3,
TaskScheduler = new ThreadPerTask_TaskScheduler()
;
Parallel.ForEach(Enumerable.Range(1, 10), parallelOptions, item =>
Console.WriteLine($"DateTime.Now:HH:mm:ss.fff" +
$" [Thread.CurrentThread.ManagedThreadId]" +
$" Processing #item" +
(Thread.CurrentThread.IsBackground ? ", Background" : "") +
(Thread.CurrentThread.IsThreadPoolThread ? ", ThreadPool" : ""));
Thread.Sleep(1000); // Simulate CPU-bound work
);
输出:
20:38:56.770 [4] Processing #3, Background
20:38:56.770 [5] Processing #2, Background
20:38:56.770 [1] Processing #1
20:38:57.782 [1] Processing #4
20:38:57.783 [8] Processing #5, Background
20:38:57.783 [7] Processing #6, Background
20:38:58.783 [1] Processing #7
20:38:58.783 [10] Processing #8, Background
20:38:58.787 [9] Processing #9, Background
20:38:59.783 [1] Processing #10
Try it on Fiddle.
这个自定义TaskScheduler
也允许当前线程参与计算。这在上面的示例中通过线程[1]
处理项目#1
、#4
、#7
和#10
进行了演示。如果您不希望发生这种情况,只需将TryExecuteTaskInline
中的代码替换为return false;
。
另一个例子,以Task.Factory.StartNew
方法为特色。在 100 个不同的线程上启动 100 个任务:
var oneThreadPerTask = new ThreadPerTask_TaskScheduler();
Task[] tasks = Enumerable.Range(1, 100).Select(_ =>
return Task.Factory.StartNew(() =>
Thread.Sleep(1000); // Simulate long-running work
, default, TaskCreationOptions.None, oneThreadPerTask);
).ToArray();
在这种情况下,当前线程没有参与工作,因为所有任务都是通过调用它们的 Start
方法在幕后启动的,而不是
RunSynchronously
.
【讨论】:
以上是关于是否总是可以使用 Task 强制一个新线程?的主要内容,如果未能解决你的问题,请参考以下文章