我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?
Posted
技术标签:
【中文标题】我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?【英文标题】:I am getting a TaskCanceledException in a Parallel.ForEach loop, how to fix it? 【发布时间】:2021-10-06 11:57:20 【问题描述】:我在 C# 中运行 Parallel.ForEach
。我低于TaskCanceledException: A task was canceled
。谁能建议我在哪里可以解决我的任务取消或增加我的连接超时?
我的代码:
private static void Main()
var SQLServerName = "XXXXXXXXXX.database.windows.net";
var SQLServerAdmin = "XXXXXXXXXX";
var SQLServerAdminPasword = "XXXXXXXXXX";
var DatabaseName = "XXXXXXXXXX";
string SQLStatement = ($"DELETE FROM ResourceStatus WHERE ResourceType = 'RBAC' " +
$"TRUNCATE TABLE RBACStaging ");
Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
DatabaseName, SQLStatement);
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();
builder.DataSource = SQLServerName;
builder.UserID = SQLServerAdmin;
builder.Password = SQLServerAdminPasword;
builder.InitialCatalog = DatabaseName;
List<string> Subscriptions = new List<string>();
using (SqlConnection connection = new SqlConnection(builder.ConnectionString))
connection.Open();
StringBuilder sb = new StringBuilder();
sb.Append($"SELECT SubscriptionID from Subscriptions " +
$"where SOX = 'SR0' and Environment = 'Prod'");
string sql = sb.ToString();
using (SqlCommand command = new SqlCommand(sql, connection))
using (SqlDataReader Reader = command.ExecuteReader())
while (Reader.Read())
Subscriptions.Add(Reader.GetString(0).ToString());
connection.Close();
SqlConnection.ClearPool(connection);
Parallel.ForEach(Subscriptions, s =>
string SubscriptionID = s.Replace(" ", String.Empty);
RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
);
GetRBACSnapshot
方法:
public static async Task GetRBACSnapshot(string SubscriptionID)
var ClientID = "de9f7784-93e4-42d0-a68d-7f1457ce4e56";
var AppKey = "XXXXXXXX";
var SQLServerName = "XXXXXXXX.database.windows.net";
var SQLServerAdmin = "XXXXXXXX";
var SQLServerAdminPasword = "XXXXXXXX";
var DatabaseName = "XXXXXXXX";
string TenantID = null;
string ServiceGroupName = null;
string TeamGroupName = null;
string ServiceName = null;
string ServiceTreeID = null;
string Level = null;
string SOX = null;
string SubscriptionName = null;
string EnvironmentScope = null;
string Tenant = null;
DateTime RawDate = DateTime.Now;
string RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
string SQLStatement = null;
SQLStatement = ($"INSERT INTO ResourceStatus " +
$"SELECT 'RBAC', 'SubscriptionID', 'Started', 'RefreshedAt'");
Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
DatabaseName, SQLStatement);
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();
builder.DataSource = SQLServerName;
builder.UserID = SQLServerAdmin;
builder.Password = SQLServerAdminPasword;
builder.InitialCatalog = DatabaseName;
using (SqlConnection connection = new SqlConnection(builder.ConnectionString))
connection.Open();
StringBuilder sb = new StringBuilder();
sb.Append($"SELECT ServiceGroupName, TeamGroupName, ServiceName, " +
$"ServiceTreeID, Level, SOX, SubscriptionName, Environment, Tenant " +
$"from Subscriptions where SubscriptionID = 'SubscriptionID'");
string sql = sb.ToString();
using (SqlCommand command = new SqlCommand(sql, connection))
using (SqlDataReader Reader = command.ExecuteReader())
while (Reader.Read())
ServiceGroupName = Reader.GetString(0).ToString();
TeamGroupName = Reader.GetString(1).ToString();
ServiceName = Reader.GetString(2).ToString();
ServiceTreeID = Reader.GetString(3).ToString();
Level = Reader.GetString(4).ToString();
SOX = Reader.GetString(5).ToString();
SubscriptionName = Reader.GetString(6).ToString();
EnvironmentScope = Reader.GetString(7).ToString();
Tenant = Reader.GetString(8).ToString();
connection.Close();
SqlConnection.ClearPool(connection);
// AARE Logic
if (Tenant.Equals("AME", StringComparison.OrdinalIgnoreCase))
TenantID = "33e01921-4d64-4f8c-a055-5bdaffd5e33d";
else if (Tenant.Equals("GME", StringComparison.OrdinalIgnoreCase))
TenantID = "124edf19-b350-4797-aefc-3206115ffdb3";
else if (Tenant.Equals("PME", StringComparison.OrdinalIgnoreCase))
TenantID = "975f013f-7f24-47e8-a7d3-abc4752bf346";
else if (Tenant.Equals("Corp", StringComparison.OrdinalIgnoreCase))
TenantID = "72f988bf-86f1-41af-91ab-2d7cd011db47";
string AzToken = await Helper.GetAccessTokenAsync(TenantID, ClientID, AppKey);
string MSGraphToken = await Helper.GetGraphAccessToken(TenantID, ClientID, AppKey)
.ConfigureAwait(true);
var httpClient = new HttpClient
BaseAddress = new Uri("https://management.azure.com/subscriptions/")
;
string URI = $"SubscriptionID/providers/Microsoft.Authorization/roleAssignments?" +
$"api-version=2018-09-01-preview";
httpClient.DefaultRequestHeaders.Remove("Authorization");
httpClient.DefaultRequestHeaders.Add("Authorization", "Bearer " + AzToken);
HttpResponseMessage response = await httpClient.GetAsync(URI).ConfigureAwait(false);
var HttpsResponse = await response.Content.ReadAsStringAsync();
dynamic Result = JsonConvert.DeserializeObject<object>(HttpsResponse);
if (Result["value"] != null)
foreach (dynamic item in Result["value"])
string ObjectID = item.properties.principalId;
string ObjectType = item.properties.principalType;
string ObjectCategory = null;
if (ObjectType.Equals("ServicePrincipal", StringComparison.OrdinalIgnoreCase))
ObjectCategory = "servicePrincipals";
else if (ObjectType.Equals("User", StringComparison.OrdinalIgnoreCase))
ObjectCategory = "users";
else if (ObjectType.Equals("Group", StringComparison.OrdinalIgnoreCase))
ObjectCategory = "groups";
string AccessScope = item.properties.scope;
string DisplayName = await Helper.GetDisplayName(TenantID, ObjectID,
MSGraphToken, ObjectCategory);
string RoleDefinitionIDFullText = item.properties.roleDefinitionId;
string RoleDefinitionID = RoleDefinitionIDFullText.Substring(
RoleDefinitionIDFullText.Length - 36);
string AccessLevel = await Helper.GetRoleDefinitionName(RoleDefinitionID,
SubscriptionID, AzToken);
string ProvisionedByObjID = item.properties.createdBy;
string ProvisionedBy = await Helper.GetDisplayName(TenantID,
ProvisionedByObjID, MSGraphToken, "Decide");
string ProvisionedAt = item.properties.createdOn;
string Compliant = Helper.ComplianceValidation(SOX, EnvironmentScope,
DisplayName, ObjectType, AccessLevel, ProvisionedBy, AccessScope);
RawDate = DateTime.Now;
RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
string Action = null;
if (Compliant.Equals("Yes"))
Action = "No Action Needed";
else if (Compliant.Equals("No"))
Action = "Revoke";
else
Action = "Yet To Decide";
Console.WriteLine($"ServiceGroupName, TeamGroupName, ServiceName, " +
$"ServiceTreeID, Level, SOX, SubscriptionName, SubscriptionID, " +
$"EnvironmentScope, Tenant, DisplayName, ObjectType, ObjectID, " +
$"AccessLevel, ProvisionedBy, ProvisionedAt, AccessScope, " +
$"Compliant, Action, ProvisionedByObjID, RefreshedAt"); ;
SQLStatement = ($"INSERT INTO RBACStaging Select 'ServiceGroupName'," +
$"'TeamGroupName','ServiceName','ServiceTreeID','Level','SOX'" +
$",'SubscriptionName','SubscriptionID','EnvironmentScope'," +
$"'Tenant','DisplayName','ObjectType','ObjectID','AccessLevel'," +
$"'ProvisionedBy','ProvisionedAt','AccessScope','Compliant'," +
$"'Action','ProvisionedByObjID','RefreshedAt'");
try
Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
DatabaseName, SQLStatement);
catch (System.Data.SqlClient.SqlException sqlException)
Console.WriteLine(sqlException.Message);
// End of IF Condition to check Null value from Rest API Calls.
RawDate = DateTime.Now;
RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
SQLStatement = ($"update ResourceStatus Set Status = 'Completed', " +
$"LastUpdatedTime = 'RefreshedAt' where ResourceType = 'RBAC' and " +
$"SubscriptionID = 'SubscriptionID'");
Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
DatabaseName, SQLStatement);
// End of Method
例外:
TaskCanceledException: A task was canceled
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.Wait()
at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally)
at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IList`1 list, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source, Action`1 body)
at CPXFundamentals.Program.Main() in C:\Users\venkatag\source\repos\CPXFundamentals\Program.cs:line 47
【问题讨论】:
附注N++;
is not thread-safe,您在并行循环的主体内调用它。
关于您的主要问题,我们需要先查看RBACSnapshot.GetRBACSnapshot
方法,然后才能提供任何有用的建议。
这确实不是并行化异步函数的方法,请参阅***.com/questions/15136542/…
Theodor Zoulias:我添加了 GETRBACSnapshot 方法。有人可以告诉我如何解决“任务取消”问题。
我添加了我的 Main 方法和 GetRBACSnapshot 方法
【参考方案1】:
如果您无权访问 GetRBACSnapshot
的实现,您可能需要考虑捕获任何 TaskCanceledException
s。
例如,您可以捕获 TaskCanceledException
并将该订阅重新添加到队列中,以便稍后重试。
请确保不要消耗所有错误,而仅消耗TaskCanceledException
。您应该避免无意中消耗严重的运行时错误。
类似的东西可能对你有用:
int N = 1;
Parallel.ForEach(Subscriptions, s =>
string SubscriptionID = s.Replace(" ", String.Empty);
Console.WriteLine($"Working on N. SubscriptionID");
try
RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
Interlocked.Increment(ref N);
catch(TaskCanceledException)
Console.WriteLine($"SubscriptionID Timed out");
// do something else like add it back to a queue to be tried again
// just make sure any collection you add the subscription to is thread safe
);
【讨论】:
我怀疑OperationCancelledException
是一个错字。
Lmao 它确实是以上是关于我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?的主要内容,如果未能解决你的问题,请参考以下文章
如何正确调用 Parallel.ForEach 循环中的调用异步方法[重复]
为啥覆盖 Parallel.foreach 循环的 .NET 单元测试依赖于硬件?
带有 BlockingCollection.GetConsumableEnumerable 的 Parallel.ForEach 循环