我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?

Posted

技术标签:

【中文标题】我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?【英文标题】:I am getting a TaskCanceledException in a Parallel.ForEach loop, how to fix it? 【发布时间】:2021-10-06 11:57:20 【问题描述】:

我在 C# 中运行 Parallel.ForEach。我低于TaskCanceledException: A task was canceled。谁能建议我在哪里可以解决我的任务取消或增加我的连接超时?

我的代码:

private static void Main()

    var SQLServerName = "XXXXXXXXXX.database.windows.net";
    var SQLServerAdmin = "XXXXXXXXXX";
    var SQLServerAdminPasword = "XXXXXXXXXX";
    var DatabaseName = "XXXXXXXXXX";

    string SQLStatement = ($"DELETE FROM ResourceStatus WHERE ResourceType = 'RBAC' " +
        $"TRUNCATE TABLE RBACStaging ");
    Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
        DatabaseName, SQLStatement);

    SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();

    builder.DataSource = SQLServerName;
    builder.UserID = SQLServerAdmin;
    builder.Password = SQLServerAdminPasword;
    builder.InitialCatalog = DatabaseName;
    List<string> Subscriptions = new List<string>();

    using (SqlConnection connection = new SqlConnection(builder.ConnectionString))
    
        connection.Open();
        StringBuilder sb = new StringBuilder();
        sb.Append($"SELECT SubscriptionID from Subscriptions " +
            $"where SOX = 'SR0' and Environment = 'Prod'");
        string sql = sb.ToString();

        using (SqlCommand command = new SqlCommand(sql, connection))
        
            using (SqlDataReader Reader = command.ExecuteReader())
            
                while (Reader.Read())
                
                    Subscriptions.Add(Reader.GetString(0).ToString());
                
                connection.Close();
                SqlConnection.ClearPool(connection);
            
        
    
    Parallel.ForEach(Subscriptions, s =>
    
        string SubscriptionID = s.Replace(" ", String.Empty);
        RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
    );

GetRBACSnapshot方法:

public static async Task GetRBACSnapshot(string SubscriptionID)

    var ClientID = "de9f7784-93e4-42d0-a68d-7f1457ce4e56";
    var AppKey = "XXXXXXXX";
    var SQLServerName = "XXXXXXXX.database.windows.net";
    var SQLServerAdmin = "XXXXXXXX";
    var SQLServerAdminPasword = "XXXXXXXX";
    var DatabaseName = "XXXXXXXX";
    string TenantID = null;
    string ServiceGroupName = null;
    string TeamGroupName = null;
    string ServiceName = null;
    string ServiceTreeID = null;
    string Level = null;
    string SOX = null;
    string SubscriptionName = null;
    string EnvironmentScope = null;
    string Tenant = null;

    DateTime RawDate = DateTime.Now;
    string RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
    string SQLStatement = null;
    SQLStatement = ($"INSERT INTO ResourceStatus " +
                    $"SELECT 'RBAC', 'SubscriptionID', 'Started', 'RefreshedAt'");
    Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
        DatabaseName, SQLStatement);

    SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();

    builder.DataSource = SQLServerName;
    builder.UserID = SQLServerAdmin;
    builder.Password = SQLServerAdminPasword;
    builder.InitialCatalog = DatabaseName;

    using (SqlConnection connection = new SqlConnection(builder.ConnectionString))
    
        connection.Open();
        StringBuilder sb = new StringBuilder();
        sb.Append($"SELECT ServiceGroupName, TeamGroupName, ServiceName, " +
            $"ServiceTreeID, Level, SOX, SubscriptionName, Environment, Tenant " +
            $"from Subscriptions where SubscriptionID = 'SubscriptionID'");
        string sql = sb.ToString();

        using (SqlCommand command = new SqlCommand(sql, connection))
        
            using (SqlDataReader Reader = command.ExecuteReader())
            
                while (Reader.Read())
                
                    ServiceGroupName = Reader.GetString(0).ToString();
                    TeamGroupName = Reader.GetString(1).ToString();
                    ServiceName = Reader.GetString(2).ToString();
                    ServiceTreeID = Reader.GetString(3).ToString();
                    Level = Reader.GetString(4).ToString();
                    SOX = Reader.GetString(5).ToString();
                    SubscriptionName = Reader.GetString(6).ToString();
                    EnvironmentScope = Reader.GetString(7).ToString();
                    Tenant = Reader.GetString(8).ToString();
                
            
        
        connection.Close();
        SqlConnection.ClearPool(connection);
    

    // AARE Logic
    if (Tenant.Equals("AME", StringComparison.OrdinalIgnoreCase))
    
        TenantID = "33e01921-4d64-4f8c-a055-5bdaffd5e33d";
    
    else if (Tenant.Equals("GME", StringComparison.OrdinalIgnoreCase))
    
        TenantID = "124edf19-b350-4797-aefc-3206115ffdb3";
    
    else if (Tenant.Equals("PME", StringComparison.OrdinalIgnoreCase))
    
        TenantID = "975f013f-7f24-47e8-a7d3-abc4752bf346";
    
    else if (Tenant.Equals("Corp", StringComparison.OrdinalIgnoreCase))
    
        TenantID = "72f988bf-86f1-41af-91ab-2d7cd011db47";
    
    string AzToken = await Helper.GetAccessTokenAsync(TenantID, ClientID, AppKey);
    string MSGraphToken = await Helper.GetGraphAccessToken(TenantID, ClientID, AppKey)
        .ConfigureAwait(true);

    var httpClient = new HttpClient
    
        BaseAddress = new Uri("https://management.azure.com/subscriptions/")
    ;
    string URI = $"SubscriptionID/providers/Microsoft.Authorization/roleAssignments?" +
        $"api-version=2018-09-01-preview";
    httpClient.DefaultRequestHeaders.Remove("Authorization");
    httpClient.DefaultRequestHeaders.Add("Authorization", "Bearer " + AzToken);
    HttpResponseMessage response = await httpClient.GetAsync(URI).ConfigureAwait(false);

    var HttpsResponse = await response.Content.ReadAsStringAsync();
    dynamic Result = JsonConvert.DeserializeObject<object>(HttpsResponse);

    if (Result["value"] != null)
    
        foreach (dynamic item in Result["value"])
        
            string ObjectID = item.properties.principalId;
            string ObjectType = item.properties.principalType;
            string ObjectCategory = null;

            if (ObjectType.Equals("ServicePrincipal", StringComparison.OrdinalIgnoreCase))
            
                ObjectCategory = "servicePrincipals";
            

            else if (ObjectType.Equals("User", StringComparison.OrdinalIgnoreCase))
            
                ObjectCategory = "users";
            

            else if (ObjectType.Equals("Group", StringComparison.OrdinalIgnoreCase))
            
                ObjectCategory = "groups";
            

            string AccessScope = item.properties.scope;
            string DisplayName = await Helper.GetDisplayName(TenantID, ObjectID,
                MSGraphToken, ObjectCategory);
            string RoleDefinitionIDFullText = item.properties.roleDefinitionId;
            string RoleDefinitionID = RoleDefinitionIDFullText.Substring(
                RoleDefinitionIDFullText.Length - 36);
            string AccessLevel = await Helper.GetRoleDefinitionName(RoleDefinitionID,
                SubscriptionID, AzToken);
            string ProvisionedByObjID = item.properties.createdBy;
            string ProvisionedBy = await Helper.GetDisplayName(TenantID,
                ProvisionedByObjID, MSGraphToken, "Decide");
            string ProvisionedAt = item.properties.createdOn;
            string Compliant = Helper.ComplianceValidation(SOX, EnvironmentScope,
                DisplayName, ObjectType, AccessLevel, ProvisionedBy, AccessScope);
            RawDate = DateTime.Now;
            RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
            string Action = null;
            if (Compliant.Equals("Yes"))
            
                Action = "No Action Needed";
            
            else if (Compliant.Equals("No"))
            
                Action = "Revoke";
            
            else
            
                Action = "Yet To Decide";
            

            Console.WriteLine($"ServiceGroupName, TeamGroupName, ServiceName, " +
                $"ServiceTreeID, Level, SOX, SubscriptionName, SubscriptionID, " +
                $"EnvironmentScope, Tenant, DisplayName, ObjectType, ObjectID, " +
                $"AccessLevel, ProvisionedBy, ProvisionedAt, AccessScope, " +
                $"Compliant, Action, ProvisionedByObjID, RefreshedAt"); ;

            SQLStatement = ($"INSERT INTO RBACStaging Select 'ServiceGroupName'," +
                $"'TeamGroupName','ServiceName','ServiceTreeID','Level','SOX'" +
                $",'SubscriptionName','SubscriptionID','EnvironmentScope'," +
                $"'Tenant','DisplayName','ObjectType','ObjectID','AccessLevel'," +
                $"'ProvisionedBy','ProvisionedAt','AccessScope','Compliant'," +
                $"'Action','ProvisionedByObjID','RefreshedAt'");
            try
            
                Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
                    DatabaseName, SQLStatement);
            
            catch (System.Data.SqlClient.SqlException sqlException)
            
                Console.WriteLine(sqlException.Message);
            
        
     // End of IF Condition to check Null value from Rest API Calls.
    RawDate = DateTime.Now;
    RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
    SQLStatement = ($"update ResourceStatus Set Status = 'Completed', " +
        $"LastUpdatedTime = 'RefreshedAt' where ResourceType = 'RBAC' and " +
        $"SubscriptionID = 'SubscriptionID'");
    Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
        DatabaseName, SQLStatement);
 // End of Method

例外:

TaskCanceledException: A task was canceled
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IList`1 list, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source, Action`1 body)
   at CPXFundamentals.Program.Main() in C:\Users\venkatag\source\repos\CPXFundamentals\Program.cs:line 47

【问题讨论】:

附注N++; is not thread-safe,您在并行循环的主体内调用它。 关于您的主要问题,我们需要先查看RBACSnapshot.GetRBACSnapshot 方法,然后才能提供任何有用的建议。 这确实不是并行化异步函数的方法,请参阅***.com/questions/15136542/… Theodor Zoulias:我添加了 GETRBACSnapshot 方法。有人可以告诉我如何解决“任务取消”问题。 我添加了我的 Main 方法和 GetRBACSnapshot 方法 【参考方案1】:

如果您无权访问 GetRBACSnapshot 的实现,您可能需要考虑捕获任何 TaskCanceledExceptions。

例如,您可以捕获 TaskCanceledException 并将该订阅重新添加到队列中,以便稍后重试。

请确保不要消耗所有错误,而仅消耗TaskCanceledException。您应该避免无意中消耗严重的运行时错误。

类似的东西可能对你有用:

int N = 1;
Parallel.ForEach(Subscriptions, s =>

    string SubscriptionID = s.Replace(" ", String.Empty);
    Console.WriteLine($"Working on N. SubscriptionID");
    
    try
    
        RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
        Interlocked.Increment(ref N);
    
    catch(TaskCanceledException)
    
        Console.WriteLine($"SubscriptionID Timed out");
        // do something else like add it back to a queue to be tried again
        // just make sure any collection you add the subscription to is thread safe
    
);

【讨论】:

我怀疑OperationCancelledException 是一个错字。 Lmao 它确实是

以上是关于我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?的主要内容,如果未能解决你的问题,请参考以下文章

.Net 中的多个 Parallel.ForEach 循环

在 parallel.ForEach 循环中获取线程 ID

在 parallel.foreach 循环中捕获的变量

如何正确调用 Parallel.ForEach 循环中的调用异步方法[重复]

为啥覆盖 Parallel.foreach 循环的 .NET 单元测试依赖于硬件?

带有 BlockingCollection.GetConsumableEnumerable 的 Parallel.ForEach 循环