如何防止任务的同步延续?

Posted

技术标签:

【中文标题】如何防止任务的同步延续?【英文标题】:How can I prevent synchronous continuations on a Task? 【发布时间】:2014-04-30 00:39:18 【问题描述】:

我有一些库(套接字网络)代码,它提供基于 Task 的 API,用于基于 TaskCompletionSource<T> 对请求的未决响应。然而,TPL 中有一个烦恼,因为它似乎不可能阻止同步延续。我希望 能够做到的是:

告诉TaskCompletionSource<T> 不允许调用者附加TaskContinuationOptions.ExecuteSynchronously,或者 将结果 (SetResult / TrySetResult) 设置为指定应忽略 TaskContinuationOptions.ExecuteSynchronously,改用池

具体来说,我遇到的问题是传入的数据正在由专用阅读器处理,如果调用者可以使用TaskContinuationOptions.ExecuteSynchronously 附加,他们可能会停止阅读器(这不仅会影响他们)。以前,我通过一些检测是否存在任何延续的黑客来解决这个问题,如果存在,它将完成推送到ThreadPool,但是如果调用者已经饱和了它们,这会产生重大影响工作队列,因为完成不会得到及时处理。如果他们使用Task.Wait()(或类似的),那么他们将基本上陷入僵局。同样,这就是阅读器使用专用线程而不是使用工作线程的原因。

所以;在我试图唠叨 TPL 团队之前:我错过了一个选项吗?

关键点:

我不希望外部调用者能够劫持我的线程 我不能使用ThreadPool 作为实现,因为它需要在池饱和时工作

以下示例产生输出(排序可能因时间而异):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

问题在于随机调用者设法在“主线程”上获得延续。在真正的代码中,这会打断初级读者;坏事!

代码:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program

    static void Identify()
    
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    
    static void Main()
    
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate 
            Identify();
        );
        task.ContinueWith(delegate 
            Identify();
        , TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    

【问题讨论】:

我会尝试用我自己的 API 包装TaskCompletionSource 以防止直接调用ContinueWith,因为TaskCompletionSourceTask 都不适合从它们继承。 @Dennis 要清楚,实际上暴露的是Task,而不是TaskCompletionSource。这(暴露不同的 API)技术上是一种选择,但为此做的事情非常极端……我不确定它是否合理 @MattH 不是真的 - 它只是重新表述了这个问题:要么为此使用 ThreadPool(我已经提到过 - 它会导致问题),要么你有一个专门的“待处理的延续”线程,然后他们(指定ExecuteSynchronously 的延续)可以劫持那个而不是 - 这会导致完全相同的问题,因为这意味着其他消息的延续可能会停止,这再次影响多个调用者跨度> @Andrey(它的工作方式就像所有调用者都使用 ContinueWith 而不使用 exec-sync)正是我想要实现的。问题是,如果我的图书馆给某人一个任务,他们可能会做一些非常不受欢迎的事情:他们可以通过(不明智地)使用 exec-sync 来打断我的读者。这是非常危险的,这就是为什么我想阻止它在库内 @Andrey 因为a:很多任务一开始就永远不会得到延续(尤其是在做批处理工作时) - 这将迫使 每个 任务都有一个,并且 b :即使是那些本来可以继续的,现在也有更多的复杂性、开销和工作人员操作。这很重要。 【参考方案1】:

.NET 4.6 中的新功能:

.NET 4.6 包含一个新的TaskCreationOptionsRunContinuationsAsynchronously


既然您愿意使用反射来访问私有字段...

您可以使用TASK_STATE_THREAD_WAS_ABORTED 标志标记 TCS 的任务,这将导致所有延续都不会被内联。

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

编辑:

我建议你使用表达式,而不是使用反射发射。这更具可读性,并且具有与 PCL 兼容的优势:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

不使用反射:

如果有人感兴趣,我想出了一种不用反射的方法,但它也有点“脏”,当然还有不可忽略的性能惩罚:

try

    Thread.CurrentThread.Abort();

catch (ThreadAbortException)

    source.TrySetResult(123);
    Thread.ResetAbort();

【讨论】:

@MarcGravell 使用它为 TPL 团队创建一些伪样本,并提出关于能够通过构造函数选项或其他方式执行此操作的更改请求。 @Adam 是的,如果您不得不将此标志称为“它的作用”而不是“导致它的原因”,它将类似于 TaskCreationOptions.DoNotInline - 甚至不需要 ctor 签名更改为TaskCompletionSource @AdamHouldsworth 别担心,我已经给他们发了同样的邮件;p 为了您的兴趣:在这里,通过ILGenerator 等优化:github.com/StackExchange/StackExchange.Redis/blob/master/… @Noseratio 是的,检查了它们-谢谢;他们都可以,IMO;我同意这是纯粹的解决方法,但结果完全正确。【参考方案2】:

simulate abort 方法看起来确实不错,但导致 TPL 劫持线程 in some scenarios

然后我有一个类似于 checking the continuation object 的实现,但只是检查 any 延续,因为实际上有太多场景让给定的代码运行良好,但这意味着即使事情像 Task.Wait 这样会导致线程池查找。

最终,在检查了很多 IL 之后,唯一安全且有用的场景是 SetOnInvokeMres 场景(manual-reset-event-slim 延续)。还有很多其他场景:

有些不安全,会导致线程劫持 其余的没有用,因为它们最终会导致线程池

所以最后,我选择检查一个非空的延续对象;如果它为空,很好(没有延续);如果它是非空的,则对SetOnInvokeMres 进行特殊情况检查 - 如果是这样:很好(可以安全调用);否则,让线程池执行TrySetComplete,而不告诉任务执行任何特殊的操作,例如欺骗中止。 Task.Wait 使用SetOnInvokeMres 方法,这是我们想要真正努力尝试不死锁的具体场景。

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)

    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[]  typeof(Task) , typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

【讨论】:

【参考方案3】:

更新,我发布了一个separate answer 来处理ContinueWith 而不是await(因为ContinueWith 不关心当前的同步上下文)。

您可以使用哑同步上下文在通过在 TaskCompletionSource 上调用 SetResult/SetCancelled/SetException 触发的延续上施加异步。我相信当前的同步上下文(在await tcs.Task 处)是 TPL 用来决定是否使此类延续同步或异步的标准。

以下对我有用:

if (notifyAsync)

    tcs.SetResultAsync(null);

else

    tcs.SetResult(null);

SetResultAsync 是这样实现的:

public static class TaskExt

    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext()  

        public static FakeSynchronizationContext Instance  get  return s_context.Value;  

        public static void Execute(Action action)
        
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            
                action();
            
            finally
            
                SynchronizationContext.SetSynchronizationContext(savedContext);
            
        

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        
            return this;
        

        public override void OperationStarted()
        
            throw new NotImplementedException("OperationStarted");
        

        public override void OperationCompleted()
        
            throw new NotImplementedException("OperationCompleted");
        

        public override void Post(SendOrPostCallback d, object state)
        
            throw new NotImplementedException("Post");
        

        public override void Send(SendOrPostCallback d, object state)
        
            throw new NotImplementedException("Send");
        
    

SynchronizationContext.SetSynchronizationContext is very cheap 就其增加的开销而言。其实implementation of WPF Dispatcher.BeginInvoke采取了非常类似的做法。

TPL 将await 处的目标同步上下文与tcs.SetResult 处的目标同步上下文进行比较。如果同步上下文相同(或者两个地方都没有同步上下文),则直接同步调用延续。否则,它在目标同步上下文中使用SynchronizationContext.Post 排队,即正常的await 行为。这种方法的作用总是强加SynchronizationContext.Post 行为(或者如果没有目标同步上下文,则池线程继续)。

更新,这不适用于task.ContinueWith,因为ContinueWith 不关心当前的同步上下文。但是它适用于await task (fiddle)。它也适用于await task.ConfigureAwait(false)

OTOH,this approach 为 ContinueWith 工作。

【讨论】:

很诱人,但更改同步上下文几乎肯定会影响调用应用程序 - 例如,恰好正在使用我的库的 Web 或 Windows 应用程序不应该发现同步上下文更改了数百次每秒。 @MarcGravell,我只在tcs.SetResult 调用的范围内更改它。这种方式有点成为原子和线程安全的,因为延续本身将发生在 另一个池线程或原始同步上。在await tcs.Task 捕获的上下文。而且SynchronizationContext.SetSynchronizationContext本身很便宜,比线程切换本身便宜很多。 这可能无法满足您的第二个要求:不使用ThreadPool。使用此解决方案,如果没有同步,TPL 确实会使用ThreadPoolawait tcs.Task 的上下文(或者它是基本的默认值)。但这是标准的 TPL 行为。 嗯...因为同步上下文是每个线程的,这实际上可能是可行的——我不需要继续切换 ctx——只需为工作线程设置一次。我需要玩它 @Noseration 啊,对:不清楚关键点是他们不同。将要看。谢谢。【参考方案4】:

我认为 TPL 中没有任何东西可以提供对TaskCompletionSource.SetResult 延续的显式 API 控制。我决定保留我的initial answer 来控制async/await 场景的这种行为。

这是另一个对ContinueWith 施加异步的解决方案,如果tcs.SetResult 触发的延续发生在调用SetResult 的同一线程上:

public static class TaskExt

    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        
            @this.SetResult(result);
        
        finally
        
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        
    

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            
            else
            
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            
        ), continuationOptions).Unwrap();
    

已更新以解决评论:

我无法控制调用者 - 我无法让他们使用特定的 continue-with 变体:如果可以,问题将不存在 第一名

我不知道你不能控制来电者。不过,如果您不控制它,您可能也不会将TaskCompletionSource 对象直接 传递给调用者。从逻辑上讲,您将传递其中的 token 部分,即tcs.Task。在这种情况下,通过在上面添加另一个扩展方法,解决方案可能会更容易:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)

    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        
        else
        
            return antecedent;
        
    ), TaskContinuationOptions.ExecuteSynchronously).Unwrap();

用途:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate

    Identify();
, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

这实际上适用于awaitContinueWith (fiddle) 并且没有反射黑客。

【讨论】:

我无法控制调用者 - 我无法让他们使用特定的 continue-with 变体:如果可以的话,问题一开始就不会存在 @MarcGravell,我不知道你不能控制来电者。我发布了关于如何处理它的更新。 图书馆作者的困境 ;p 请注意,有人找到了一种更简单、更直接的方法来达到预期的结果【参考方案5】:

如果您可以并且准备好使用反射,那么应该这样做;

public static class MakeItAsync

    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        

        source.TrySetResult(result);
            

【讨论】:

这个 hack 可能会在框架的下一个版本中停止工作。 @Noseratio,是的,但它现在可以工作,他们也可能会在下一个版本中实现正确的方法来做到这一点 但是如果你能做到Task.Run(() =&gt; tcs.SetResult(result)),你为什么需要这个? @Noseratio,我不知道,向 Marc 提出这个问题 :-),我只是在连接到 TaskCompletionSource 的所有任务上删除标志 TaskContinuationOptions.ExecuteSynchronously 以确保它们都使用线程池代替主线程 m_continuationObject hack 实际上是我已经用来识别潜在问题任务的作弊方法 - 所以这不是超出考虑范围的。有趣,谢谢。这是迄今为止最有用的选项。【参考方案6】:

不如干点什么

var task = source.Task;

你可以这样做

var task = source.Task.ContinueWith<Int32>( x => x.Result );

因此,您总是添加一个将异步执行的延续,然后订阅者是否想要在同一上下文中延续并不重要。这有点像在执行任务,不是吗?

【讨论】:

出现在 cmets 中(参见 Andrey);问题那里是它强制所有任务在原本不会有的情况下继续进行,这是ContinueWithawait通常会努力的事情避免(通过检查已经完成等) - 因为这会将一切强加给工人,它实际上会加剧这种情况。这是一个积极的想法,我感谢你:但在这种情况下它不会有帮助。

以上是关于如何防止任务的同步延续?的主要内容,如果未能解决你的问题,请参考以下文章

如何在子任务上使用延续返回 Task<Object>

防止并发问题

JavaScript事件循环:同步任务和异步任务

防止 DbContext 被释放,直到异步任务完成

合并不同类型的 .NET 4.0 任务/延续

如何确保 Celery 任务是防止重叠的 Celery 任务执行