使用 CancellationToken.None 使用 ConcurrentAppendAsync 并发追加内容时的 TaskCanceledException

Posted

技术标签:

【中文标题】使用 CancellationToken.None 使用 ConcurrentAppendAsync 并发追加内容时的 TaskCanceledException【英文标题】:TaskCanceledException when appending content concurrent using ConcurrentAppendAsync using CancellationToken.None 【发布时间】:2018-01-07 04:29:04 【问题描述】:

我编写了一个日志系统,它将 json 格式的日志写入 Azure Data Lake 中的文件。多个线程正在写入该文件,因此我使用 ConcurrentAppendAsync 方法。

我这样初始化访问:

var clientCredential = new ClientCredential(ClientId, ClientSecret);
var creds = ApplicationTokenProvider.LoginSilentAsync(Domain, clientCredential).GetAwaiter().GetResult();
fileSystemClient = new DataLakeStoreFileSystemManagementClient(creds);
fileSystem = fileSystemClient.FileSystem;

我这样写内容:

    private async Task WriteToDataLakeAsync(IGrouping<string, LogEvent> logEvents)
    
        var logEvent = logEvents.FirstOrDefault();
        if (logEvent == null)
            return;

        var filename = string.Format(@"Logs\0:yyyy\0:MM\0:dd\1\2.json", logEvent.Session.Timestamp, logEvent.Session.Origin, logEvent.Session.SessionId);

        var jsonBuilder = new StringBuilder();

        foreach (var @event in logEvents)
        
            jsonBuilder.AppendLine(JsonConvert.SerializeObject(@event.SimplifyForBlob(), Formatting.None));
        

        try
        
            await fileSystem.ConcurrentAppendAsync(Account, filename, new MemoryStream(Encoding.UTF8.GetBytes(jsonBuilder.ToString())), AppendModeType.Autocreate, cancellationToken: CancellationToken.None);
        
        catch (Exception ex)
        
            telemetryClient.TrackException(new ExceptionTelemetry(new Exception($"nameof(DataLake): Failed to write filename", ex)));
        
    

问题是,虽然我将CancellationToken.None 传递给fileSystem.ConcurrentAppendAsync(...),但我的catch 块中仍然有一个TaskCanceledException

系统异常: System.Threading.Tasks.TaskCanceledException: 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 Microsoft.Rest.RetryDelegatingHandler+c__DisplayClass11_0+d.MoveNext(Microsoft.Rest.ClientRuntime,版本=2.0.0.0,文化=中性,PublicKeyToken=31bf3856ad364e35) 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 Microsoft.Rest.RetryDelegatingHandler+d__11.MoveNext(Microsoft.Rest.ClientRuntime,版本=2.0.0.0,文化=中性,PublicKeyToken=31bf3856ad364e35) 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Net.Http.HttpClient+d__58.MoveNext(System.Net.Http,版本=4.1.1.1,文化=中性,PublicKeyToken=b03f5f7f11d50a3a) 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 Microsoft.Azure.Management.DataLake.Store.FileSystemOperations+d__10.MoveNext(Microsoft.Azure.Management.DataLake.Store,版本=2.0.0.0,文化=中性,PublicKeyToken=31bf3856ad364e35) 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 Microsoft.Azure.Management.DataLake.Store.FileSystemOperationsExtensions+d__3.MoveNext(Microsoft.Azure.Management.DataLake.Store,版本=2.0.0.0,文化=中性,PublicKeyToken=31bf3856ad364e35) 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,版本=4.0.0.0,文化=中性,PublicKeyToken=b77a5c561934e089) 在 DHS.PlanCare.CentralLogging.EventHub.Processors.StreamProcessors.DataLake+d__15.MoveNext(DHS.PlanCare.CentralLogging.EventHub.Processors,版本=1.0.0.0,文化=中性,PublicKeyToken=null:D:\Sources\DHS\ DHS.PlanCare.ManagementPortal\DHS.PlanCare.ManagementPortal.EventHub.Processors\StreamProcessors\DataLake.cs:95)

(第 95 行是对ConcurrentAppendAsync 的调用)

这里会发生什么? the docs 对这个异常不说什么也没有用。文档中甚至有一个错误,因为它指出当没有传递 CancellationToken 时,值将是 null,这是不可能的,因为 CancellationToken 是一个值类型,因此不能是 null

所以我确实知道我不会发起任何形式的取消。问题是,我是否可以安全地忽略此异常,或者根本不会写入数据。由于摄取的数据量很大,而且有时会发生这种情况,因此很难检查数据是否实际写入。

【问题讨论】:

不知道为什么因为“为什么这段代码不起作用”而投票关闭。我真的想不出更多可以附加到这篇文章的内容了。我有一个特定的问题和错误,但我确实意识到它不能轻易复制,但这就是为什么我希望找到曾经使用过这项技术并曾经处理过它的人。 SDK 客户端在等待服务器对 ConcurrentAppend 调用的响应时可能达到了超时限制。您可以配置此超时when creating the DataLakeStoreFileSystemManagementClient。 在一次 ConcurrentAppend 调用中最多传输多少数据? @MattH 理论上约为 256kb,因为这是发送到 Azure 事件中心的批处理消息的最大大小。但我希望每个文件的大小会更少,大约 20kb。 【参考方案1】:

HttpClient,在内部使用,超时时抛出一个OperationCanceledException。这些超时可能发生在活动连接期间或尝试建立连接时。 DataLakeStoreFileSystemManagementClient 有一个构造函数,它带有一个可选的clientTimeoutInMinutes 参数,用于设置HttpClient.Timeout

在这种情况下,实际上是 Microsoft.Rest.RetryDelegatingHandler 响应 HttpClient 设置的取消引发了异常。

它通常也来自内部CancellationTokenSource.CancelAfter,也可以用于超时。

由于区分超时和真正的取消请求很有用,我通常会这样包装:

try

    ...

catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested)

    throw new TimeoutException("The operation timed out.", ex);

【讨论】:

以上是关于使用 CancellationToken.None 使用 ConcurrentAppendAsync 并发追加内容时的 TaskCanceledException的主要内容,如果未能解决你的问题,请参考以下文章

在订阅回调中结束订阅

模拟 HttpClient.SendAsync 以返回内容不为空的响应

测试使用

第一篇 用于测试使用

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

今目标使用教程 今目标任务使用篇