如何高效地处理数百个项目中的数千个 C# 文件?
Posted
技术标签:
【中文标题】如何高效地处理数百个项目中的数千个 C# 文件?【英文标题】:How to process thousands of C# files across hundreds of projects efficiently? 【发布时间】:2021-11-07 09:37:47 【问题描述】:简介
我正在解决以下问题:
给定具有一个或多个公开可见常量的 C# 类型 X 解决方案中依赖于 X 类型中的常量的所有 C# 类型是什么?因为常量是在编译时内联的,所以检查二进制文件是没有意义的。我们需要使用 Roslyn API 检查源代码。
我不会使用语义分析,因为它会非常昂贵。相反,我将使用正则表达式来检查给定文件是否似乎使用了常量,然后使用语法树进行验证。不是防弹的,但足够好并且相对快。
目前的统计数据是:
项目数:333 文件数:45280 SSD 驱动器我的实现
整体方案是:
-
生成相关
Microsoft.Build.Evaluation.Project
对象的流,这将为我们提供 C# 文件列表
从 C# 文件生成 C# 文件内容流
对于每个 C# 文件内容匹配给定的正则表达式,并为每个匹配确定是否使用该 C# 文件内容的语法树使用相关常量。如果是肯定的 - 报告相应的类型。
我不想过多的细节,另一方面我想提供足够的细节来解释我的困难。不多,请多包涵。
我使用了几个小的辅助类型:
项目项目
private class ProjectItem
public readonly string AssemblyName;
public readonly CSharpParseOptions ParseOptions;
public readonly IEnumerable<string> CSFilePaths;
public ProjectItem(TypeMap typeMap, string asmName)
AssemblyName = asmName;
var asmProps = typeMap.Assemblies[asmName];
ParseOptions = asmProps.GetParseOptions();
CSFilePaths = new Project(asmProps.ProjectPath).GetItems("Compile").Select(item => item.GetMetadataValue("FullPath"));
public IEnumerable<TextItem> YieldTextItems() => CSFilePaths.Select(csFilePath => new TextItem(this, csFilePath, File.ReadAllText(csFilePath)));
TypeMap
是我们解决方案中使用的所有类型和程序集的注册表。其他一些代码之前已经构建了它。将其视为可以回答某些问题的预言机,例如“给我给定程序集的解析选项(或项目路径)”。但它没有指定项目使用的 C# 文件列表。为此,我们需要实例化相应的 Microsoft.Build.Evaluation.Project
实例。哪个很贵。
文本项
private class TextItem
public readonly string AssemblyName;
public readonly CSharpParseOptions ParseOptions;
public readonly string CSFilePath;
public readonly string Text;
public TextItem(ProjectItem item, string csFilePath, string text)
AssemblyName = item.AssemblyName;
ParseOptions = item.ParseOptions;
CSFilePath = csFilePath;
Text = text;
public IEnumerable<TypeDefKey> YieldDependentTypes(TypeMap typeMap, TypeDefKey constTypeDefKey, Regex regex)
...
SyntaxTree syntaxTree = null;
foreach (Match m in regex.Matches(Text))
if (syntaxTree == null)
syntaxTree = CSharpSyntaxTree.ParseText(Text, ParseOptions, CSFilePath);
...
...
if (IsTheRegexMatchIndeedCorrespondsToTheGivenConstantType(syntaxTree, ...))
var typeDefKey = GetTheType(syntaxTree, ...);
yield return typeDefKey;
鉴于上述类型,我想出了这个简单的 TPL 数据流管道:
var regex = GetRegex(...);
var dependentAssemblies = GetDependentAssemblies(...);
var dependentTypes = new ConcurrentDictionary<TypeDefKey, object>();
var produceCSFilePaths = new TransformManyBlock<ICollection<string>, ProjectItem>(asmNames => asmNames.Select(asmName => new ProjectItem(typeMap, asmName)));
var produceCSFileText = new TransformManyBlock<ProjectItem, TextItem>(p => p.YieldTextItems());
var produceDependentTypes = new TransformManyBlock<TextItem, TypeDefKey>(t => t.YieldDependentTypes(typeMap, constTypeDefKey, regex));
var getDependentTypes = new ActionBlock<TypeDefKey>(typeDefKey => dependentTypes.TryAdd(typeDefKey, null));
var linkOptions = new DataflowLinkOptions PropagateCompletion = true ;
produceCSFilePaths.LinkTo(produceCSFileText, linkOptions);
produceCSFileText.LinkTo(produceDependentTypes, linkOptions);
produceDependentTypes.LinkTo(getDependentTypes, linkOptions);
produceCSFilePaths.Post(dependentAssemblies);
produceCSFilePaths.Complete();
getDependentTypes.Completion.Wait();
问题和疑问
-
速度很慢 - 大约需要 50 秒,CPU 利用率低。我意识到这里有很多 IO,但仍然需要 CPU 来应用正则表达式并将内容解析到相应的语法树中。
我不明白如何将
TransformManyBlock
与异步 IO 一起使用。 ProjectItem.YieldTextItems
函数可以返回 IObservable<TextItem>
或 IAsyncEnumerable<TextItem>
,但 TransformManyBlock
不会识别它。我是 TPL Dataflow 的新手,所以我不清楚如何解决这个问题。这就是为什么我使用阻塞 File.ReadAllText
而不是 File.ReadAllTextAsync
。
我认为我的管道使用 ThreadPool 线程(通过默认的 TaskScheduler),但它不应该使用真正的线程吗?喜欢用Task.Factory.StartNew(..., TaskCreationOptions.LongRunning);
创建的那些?那么,它是否使用“正确的”线程?如果没有 - 如何解决?我已经看到了实施自定义 TaskScheduler
的建议,但我找不到示例。现有的似乎依赖于内部实现,所以不清楚如何进行。
我尝试为ProjectItem
和TextItem
生产增加MaxDegreeOfParallelism,因为这两个主要是IO,因此比最后一部分慢得多- 检查C# 文件内容。但它并没有在性能上产生太大的改进。据我了解,管道越慢,并行性就应该越高。另一方面,我不知道从 SSD 读取时可以有多少并行度。完全不清楚如何对其进行分析。
【问题讨论】:
看来这个问题太大太复杂了。最好把它分成小块,分别调查,问不同的问题 仅供参考,向TransformManyBlock
添加对IAsyncEnumerable<T>
的支持是微软的计划,但团队无法及时完成.NET 6,因此it has been postponed for .NET 7。目前,您可能会通过查看this 的答案获得一些关于如何超越此限制的想法。
@TheodorZoulias - 我想这个想法不是将produceCSFileText
链接到produceDependentTypes
,而是让前者使用SendAsync
方法发送TextItem
对象。然后手动处理两者之间的延续传播。
【参考方案1】:
这里是 TransformManyBlock<TInput,TOutput>
数据流组件的直接替换,其构造函数接受 Func<TInput, IAsyncEnumerable<TOutput>>
委托。它具有所有 API 并支持所有选项(但请参阅最后关于 BoundedCapacity
的警告)。它在内部由三个链接组件组成,TransformBlock
、ActionBlock
和 BufferBlock
:
public class TransformManyBlockEx<TInput, TOutput>
: IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
private readonly TransformBlock<TInput, (long, TInput)> _input;
private readonly ActionBlock<(long, TInput)> _transformer;
private readonly BufferBlock<TOutput> _output;
private readonly Dictionary<long, (Queue<TOutput> Queue, bool Completed)> _byIndex;
private readonly CancellationToken _cancellationToken;
private long currentIndex = 0L;
private long minIndex = 0L;
public TransformManyBlockEx(Func<TInput, IAsyncEnumerable<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
// Arguments validation omitted
dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
_cancellationToken = dataflowBlockOptions.CancellationToken;
if (dataflowBlockOptions.EnsureOrdered)
_byIndex = new Dictionary<long, (Queue<TOutput>, bool)>();
_input = new TransformBlock<TInput, (long, TInput)>(item =>
return (currentIndex++, item); // No parallelism here
, new ExecutionDataflowBlockOptions()
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = _cancellationToken
);
_transformer = new ActionBlock<(long, TInput)>(async entry =>
var (index, item) = entry;
Queue<TOutput> queue = null;
if (_byIndex != null)
// EnsureOrdered is enabled
queue = new Queue<TOutput>();
lock (_byIndex) _byIndex.Add(index, (queue, false));
var resultSequence = transform(item);
await foreach (var result in resultSequence
.WithCancellation(_cancellationToken))
if (_byIndex != null)
lock (queue) queue.Enqueue(result);
if (!await SendPendingResultsAsync()) return;
else
if (!await _output.SendAsync(result, _cancellationToken)) return;
if (_byIndex != null)
lock (_byIndex) _byIndex[index] = (queue, true); // Mark as completed
await SendPendingResultsAsync();
, dataflowBlockOptions);
_input.LinkTo(_transformer, new() PropagateCompletion = true );
_output = new BufferBlock<TOutput>(dataflowBlockOptions);
Task transformerPostCompletion = _transformer.Completion.ContinueWith(t =>
if (_byIndex != null)
int pendingCount;
lock (_byIndex)
pendingCount = _byIndex.Count;
_byIndex.Clear(); // Cleanup
if (t.IsCompletedSuccessfully && pendingCount > 0)
throw new InvalidOperationException(
"The transformer completed before emitting all queued results.");
, TaskScheduler.Default);
// The Task.WhenAll aggregates nicely the exceptions of the two tasks
PropagateCompletion(
Task.WhenAll(_transformer.Completion, transformerPostCompletion),
_output);
private static async void PropagateCompletion(Task sourceCompletion,
IDataflowBlock target)
try await sourceCompletion.ConfigureAwait(false); catch
var ex = sourceCompletion.IsFaulted ? sourceCompletion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
private async Task<bool> SendPendingResultsAsync()
// Returns false in case the BufferBlock rejected a result
// This may happen in case of cancellation
while (TrySendNextPendingResult(out var sendTask))
if (!await sendTask) return false;
return true;
private bool TrySendNextPendingResult(out Task<bool> sendTask)
// Returns false in case currently there is no pending result
sendTask = null;
lock (_byIndex)
while (true)
if (!_byIndex.TryGetValue(minIndex, out var entry))
return false; // The next queue in not in the dictionary yet
var (queue, completed) = entry; // We found the next queue
lock (queue)
if (queue.TryDequeue(out var result))
// We found the next result
// Send the result while holding the lock on _byIndex
// The BufferBlock respects the order of submited items
sendTask = _output.SendAsync(result, _cancellationToken);
return true;
// Currently the queue is empty
// If it's not completed yet, return. It may have more items later.
if (!completed) return false;
// OK, the queue is now both empty and completed
_byIndex.Remove(minIndex); // Remove it
minIndex++; // Continue with the next queue in order
public TransformManyBlockEx(Func<TInput, Task<IEnumerable<TOutput>>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
: this(ToAsyncEnumerable(transform), dataflowBlockOptions)
public TransformManyBlockEx(Func<TInput, IEnumerable<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
: this(ToAsyncEnumerable(transform), dataflowBlockOptions)
public Task Completion => _output.Completion;
public void Complete() => _input.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_input).Fault(exception);
public int InputCount
=> _input.InputCount + _input.OutputCount + _transformer.InputCount;
public int OutputCount
get
int count = _output.Count;
if (_byIndex == null) return count;
lock (_byIndex) return count + _byIndex.Values
.Select(e => lock (e.Queue) return e.Queue.Count; ).Sum();
public IDisposable LinkTo(ITargetBlock<TOutput> target,
DataflowLinkOptions linkOptions)
=> _output.LinkTo(target, linkOptions);
public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
=> ((IReceivableSourceBlock<TOutput>)_output).TryReceive(filter, out item);
public bool TryReceiveAll(out IList<TOutput> items)
=> ((IReceivableSourceBlock<TOutput>)_output).TryReceiveAll(out items);
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
DataflowMessageHeader messageHeader, TInput messageValue,
ISourceBlock<TInput> source, bool consumeToAccept)
=> ((ITargetBlock<TInput>)_input).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<TOutput> target, out bool messageConsumed)
=> ((ISourceBlock<TOutput>)_output).ConsumeMessage(messageHeader, target,
out messageConsumed);
bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<TOutput> target)
=> ((ISourceBlock<TOutput>)_output).ReserveMessage(messageHeader, target);
void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<TOutput> target)
=> ((ISourceBlock<TOutput>)_output).ReleaseReservation(messageHeader,
target);
private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
Func<TInput, Task<IEnumerable<TOutput>>> transform)
return (item) => Iterator(item);
async IAsyncEnumerable<TOutput> Iterator(TInput item)
foreach (var result in await transform(item)) yield return result;
private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
Func<TInput, IEnumerable<TOutput>> transform)
return (item) => Iterator(item);
async IAsyncEnumerable<TOutput> Iterator(TInput item)
foreach (var result in transform(item)) yield return result;
await Task.CompletedTask; // Suppress CS1998
主要困难在于EnsureOrdered
选项,因为我们不能依赖任何组合块的内置功能。选择的解决方案基于来自this 问题的想法:Dictionary
和long
键,结果按原始顺序存储,long
计数器保存尚未发出的最小索引。
此实现具有所有由组合构建的自定义数据流组件的常见缺点:
-
异常包含在额外的
AggregateException
s 中。这通常需要Flatten
管道中最后一个块传播的异常。
BoundedCapacity
乘以链接的内部组件的数量。 TransformManyBlockEx
的有效 BoundedCapacity
是选项中传递的值的三倍,加上存储在内部重新排序缓冲区中的结果(启用 EnsureOrdered
选项时)。这个缓冲区并没有真正的限制。不幸的是,修复这个缺陷并非易事。
【讨论】:
我将不得不摸索和测试它。可能需要一些时间。但是肯定会 +1。 @mark 假设它没有错误,它可能会作为一个临时解决方案,直到微软发布真实的东西(希望在 .NET 7 中)。 @mark 我在实现中添加了一个鲁棒性功能:如果内部transformer
块成功完成而没有发出所有排队的结果(由于一些错误),InvalidOperationException
会通过Completion
任务。
@mark 我刚刚在实现中发现了一个错误。当多个生产者同时向TransformManyBlockEx
发送项目时,它可能会死锁。它可能与ImmediateTaskScheduler
有关。我正在看。
把它发布到github上有意义吗?在微软实施之前似乎很有用。以上是关于如何高效地处理数百个项目中的数千个 C# 文件?的主要内容,如果未能解决你的问题,请参考以下文章