如何高效地处理数百个项目中的数千个 C# 文件?

Posted

技术标签:

【中文标题】如何高效地处理数百个项目中的数千个 C# 文件?【英文标题】:How to process thousands of C# files across hundreds of projects efficiently? 【发布时间】:2021-11-07 09:37:47 【问题描述】:

简介

我正在解决以下问题:

给定具有一个或多个公开可见常量的 C# 类型 X 解决方案中依赖于 X 类型中的常量的所有 C# 类型是什么?

因为常量是在编译时内联的,所以检查二进制文件是没有意义的。我们需要使用 Roslyn API 检查源代码。

我不会使用语义分析,因为它会非常昂贵。相反,我将使用正则表达式来检查给定文件是否似乎使用了常量,然后使用语法树进行验证。不是防弹的,但足够好并且相对快。

目前的统计数据是:

项目数:333 文件数:45280 SSD 驱动器

我的实现

整体方案是:

    生成相关Microsoft.Build.Evaluation.Project 对象的流,这将为我们提供 C# 文件列表 从 C# 文件生成 C# 文件内容流 对于每个 C# 文件内容匹配给定的正则表达式,并为每个匹配确定是否使用该 C# 文件内容的语法树使用相关常量。如果是肯定的 - 报告相应的类型。

我不想过多的细节,另一方面我想提供足够的细节来解释我的困难。不多,请多包涵。

我使用了几个小的辅助类型:

项目项目

private class ProjectItem

    public readonly string AssemblyName;
    public readonly CSharpParseOptions ParseOptions;
    public readonly IEnumerable<string> CSFilePaths;

    public ProjectItem(TypeMap typeMap, string asmName)
    
        AssemblyName = asmName;
        var asmProps = typeMap.Assemblies[asmName];
        ParseOptions = asmProps.GetParseOptions();
        CSFilePaths = new Project(asmProps.ProjectPath).GetItems("Compile").Select(item => item.GetMetadataValue("FullPath"));
    

    public IEnumerable<TextItem> YieldTextItems() => CSFilePaths.Select(csFilePath => new TextItem(this, csFilePath, File.ReadAllText(csFilePath)));

TypeMap 是我们解决方案中使用的所有类型和程序集的注册表。其他一些代码之前已经构建了它。将其视为可以回答某些问题的预言机,例如“给我给定程序集的解析选项(或项目路径)”。但它没有指定项目使用的 C# 文件列表。为此,我们需要实例化相应的 Microsoft.Build.Evaluation.Project 实例。哪个很贵。

文本项

private class TextItem

    public readonly string AssemblyName;
    public readonly CSharpParseOptions ParseOptions;
    public readonly string CSFilePath;
    public readonly string Text;

    public TextItem(ProjectItem item, string csFilePath, string text)
    
        AssemblyName = item.AssemblyName;
        ParseOptions = item.ParseOptions;
        CSFilePath = csFilePath;
        Text = text;
    

    public IEnumerable<TypeDefKey> YieldDependentTypes(TypeMap typeMap, TypeDefKey constTypeDefKey, Regex regex)
    
        ...
        SyntaxTree syntaxTree = null;
        foreach (Match m in regex.Matches(Text))
        
            if (syntaxTree == null)
            
                syntaxTree = CSharpSyntaxTree.ParseText(Text, ParseOptions, CSFilePath);
                ...
            

            ...
            if (IsTheRegexMatchIndeedCorrespondsToTheGivenConstantType(syntaxTree, ...))
            
                var typeDefKey = GetTheType(syntaxTree, ...);
                yield return typeDefKey;
            
        
    

鉴于上述类型,我想出了这个简单的 TPL 数据流管道:

var regex = GetRegex(...);
var dependentAssemblies = GetDependentAssemblies(...);
var dependentTypes = new ConcurrentDictionary<TypeDefKey, object>();

var produceCSFilePaths = new TransformManyBlock<ICollection<string>, ProjectItem>(asmNames => asmNames.Select(asmName => new ProjectItem(typeMap, asmName)));
var produceCSFileText = new TransformManyBlock<ProjectItem, TextItem>(p => p.YieldTextItems());
var produceDependentTypes = new TransformManyBlock<TextItem, TypeDefKey>(t => t.YieldDependentTypes(typeMap, constTypeDefKey, regex));
var getDependentTypes = new ActionBlock<TypeDefKey>(typeDefKey => dependentTypes.TryAdd(typeDefKey, null));

var linkOptions = new DataflowLinkOptions  PropagateCompletion = true ;

produceCSFilePaths.LinkTo(produceCSFileText, linkOptions);
produceCSFileText.LinkTo(produceDependentTypes, linkOptions);
produceDependentTypes.LinkTo(getDependentTypes, linkOptions);

produceCSFilePaths.Post(dependentAssemblies);
produceCSFilePaths.Complete();
getDependentTypes.Completion.Wait();

问题和疑问

    速度很慢 - 大约需要 50 秒,CPU 利用率低。我意识到这里有很多 IO,但仍然需要 CPU 来应用正则表达式并将内容解析到相应的语法树中。 我不明白如何将 TransformManyBlock 与异步 IO 一起使用。 ProjectItem.YieldTextItems 函数可以返回 IObservable&lt;TextItem&gt;IAsyncEnumerable&lt;TextItem&gt;,但 TransformManyBlock 不会识别它。我是 TPL Dataflow 的新手,所以我不清楚如何解决这个问题。这就是为什么我使用阻塞 File.ReadAllText 而不是 File.ReadAllTextAsync。 我认为我的管道使用 ThreadPool 线程(通过默认的 TaskScheduler),但它不应该使用真正的线程吗?喜欢用Task.Factory.StartNew(..., TaskCreationOptions.LongRunning); 创建的那些?那么,它是否使用“正确的”线程?如果没有 - 如何解决?我已经看到了实施自定义 TaskScheduler 的建议,但我找不到示例。现有的似乎依赖于内部实现,所以不清楚如何进行。 我尝试为ProjectItemTextItem 生产增加MaxDegreeOfParallelism,因为这两个主要是IO,因此比最后一部分慢得多- 检查C# 文件内容。但它并没有在性能上产生太大的改进。据我了解,管道越慢,并行性就应该越高。另一方面,我不知道从 SSD 读取时可以有多少并行度。完全不清楚如何对其进行分析。

【问题讨论】:

看来这个问题太大太复杂了。最好把它分成小块,分别调查,问不同的问题 仅供参考,向TransformManyBlock 添加对IAsyncEnumerable&lt;T&gt; 的支持是微软的计划,但团队无法及时完成.NET 6,因此it has been postponed for .NET 7。目前,您可能会通过查看this 的答案获得一些关于如何超越此限制的想法。 @TheodorZoulias - 我想这个想法不是将produceCSFileText 链接到produceDependentTypes,而是让前者使用SendAsync 方法发送TextItem 对象。然后手动处理两者之间的延续传播。 【参考方案1】:

这里是 TransformManyBlock&lt;TInput,TOutput&gt; 数据流组件的直接替换,其构造函数接受 Func&lt;TInput, IAsyncEnumerable&lt;TOutput&gt;&gt; 委托。它具有所有 API 并支持所有选项(但请参阅最后关于 BoundedCapacity 的警告)。它在内部由三个链接组件组成,TransformBlockActionBlockBufferBlock

public class TransformManyBlockEx<TInput, TOutput>
    : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>

    private readonly TransformBlock<TInput, (long, TInput)> _input;
    private readonly ActionBlock<(long, TInput)> _transformer;
    private readonly BufferBlock<TOutput> _output;
    private readonly Dictionary<long, (Queue<TOutput> Queue, bool Completed)> _byIndex;
    private readonly CancellationToken _cancellationToken;
    private long currentIndex = 0L;
    private long minIndex = 0L;

    public TransformManyBlockEx(Func<TInput, IAsyncEnumerable<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    
        // Arguments validation omitted
        dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
        _cancellationToken = dataflowBlockOptions.CancellationToken;
        if (dataflowBlockOptions.EnsureOrdered)
            _byIndex = new Dictionary<long, (Queue<TOutput>, bool)>();

        _input = new TransformBlock<TInput, (long, TInput)>(item =>
        
            return (currentIndex++, item); // No parallelism here
        , new ExecutionDataflowBlockOptions()
        
            BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
            CancellationToken = _cancellationToken
        );

        _transformer = new ActionBlock<(long, TInput)>(async entry =>
        
            var (index, item) = entry;
            Queue<TOutput> queue = null;
            if (_byIndex != null)
            
                // EnsureOrdered is enabled
                queue = new Queue<TOutput>();
                lock (_byIndex) _byIndex.Add(index, (queue, false));
            
            var resultSequence = transform(item);
            await foreach (var result in resultSequence
                .WithCancellation(_cancellationToken))
            
                if (_byIndex != null)
                
                    lock (queue) queue.Enqueue(result);
                    if (!await SendPendingResultsAsync()) return;
                
                else
                
                    if (!await _output.SendAsync(result, _cancellationToken)) return;
                
            
            if (_byIndex != null)
            
                lock (_byIndex) _byIndex[index] = (queue, true); // Mark as completed
                await SendPendingResultsAsync();
            
        , dataflowBlockOptions);

        _input.LinkTo(_transformer, new()  PropagateCompletion = true );

        _output = new BufferBlock<TOutput>(dataflowBlockOptions);

        Task transformerPostCompletion = _transformer.Completion.ContinueWith(t =>
        
            if (_byIndex != null)
            
                int pendingCount;
                lock (_byIndex)
                
                    pendingCount = _byIndex.Count;
                    _byIndex.Clear(); // Cleanup
                
                if (t.IsCompletedSuccessfully && pendingCount > 0)
                    throw new InvalidOperationException(
                    "The transformer completed before emitting all queued results.");
            
        , TaskScheduler.Default);

        // The Task.WhenAll aggregates nicely the exceptions of the two tasks
        PropagateCompletion(
            Task.WhenAll(_transformer.Completion, transformerPostCompletion),
            _output);
    

    private static async void PropagateCompletion(Task sourceCompletion,
        IDataflowBlock target)
    
        try  await sourceCompletion.ConfigureAwait(false);  catch  
        var ex = sourceCompletion.IsFaulted ? sourceCompletion.Exception : null;
        if (ex != null) target.Fault(ex); else target.Complete();
    

    private async Task<bool> SendPendingResultsAsync()
    
        // Returns false in case the BufferBlock rejected a result
        // This may happen in case of cancellation
        while (TrySendNextPendingResult(out var sendTask))
        
            if (!await sendTask) return false;
        
        return true;
    

    private bool TrySendNextPendingResult(out Task<bool> sendTask)
    
        // Returns false in case currently there is no pending result
        sendTask = null;
        lock (_byIndex)
        
            while (true)
            
                if (!_byIndex.TryGetValue(minIndex, out var entry))
                    return false; // The next queue in not in the dictionary yet

                var (queue, completed) = entry; // We found the next queue

                lock (queue)
                
                    if (queue.TryDequeue(out var result))
                    
                        // We found the next result
                        // Send the result while holding the lock on _byIndex
                        // The BufferBlock respects the order of submited items
                        sendTask = _output.SendAsync(result, _cancellationToken);
                        return true;
                    
                

                // Currently the queue is empty
                // If it's not completed yet, return. It may have more items later.
                if (!completed) return false;

                // OK, the queue is now both empty and completed
                _byIndex.Remove(minIndex); // Remove it
                minIndex++; // Continue with the next queue in order
            
        
    

    public TransformManyBlockEx(Func<TInput, Task<IEnumerable<TOutput>>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
        : this(ToAsyncEnumerable(transform), dataflowBlockOptions)  

    public TransformManyBlockEx(Func<TInput, IEnumerable<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
        : this(ToAsyncEnumerable(transform), dataflowBlockOptions)  

    public Task Completion => _output.Completion;
    public void Complete() => _input.Complete();
    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_input).Fault(exception);

    public int InputCount
        => _input.InputCount + _input.OutputCount + _transformer.InputCount;

    public int OutputCount
    
        get
        
            int count = _output.Count;
            if (_byIndex == null) return count;
            lock (_byIndex) return count + _byIndex.Values
                .Select(e =>  lock (e.Queue) return e.Queue.Count; ).Sum();
        
    

    public IDisposable LinkTo(ITargetBlock<TOutput> target,
        DataflowLinkOptions linkOptions)
            => _output.LinkTo(target, linkOptions);

    public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
        => ((IReceivableSourceBlock<TOutput>)_output).TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<TOutput> items)
        => ((IReceivableSourceBlock<TOutput>)_output).TryReceiveAll(out items);

    DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
        DataflowMessageHeader messageHeader, TInput messageValue,
        ISourceBlock<TInput> source, bool consumeToAccept)
            => ((ITargetBlock<TInput>)_input).OfferMessage(messageHeader,
                messageValue, source, consumeToAccept);

    TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<TOutput> target, out bool messageConsumed)
            => ((ISourceBlock<TOutput>)_output).ConsumeMessage(messageHeader, target,
                out messageConsumed);

    bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<TOutput> target)
            => ((ISourceBlock<TOutput>)_output).ReserveMessage(messageHeader, target);

    void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<TOutput> target)
            => ((ISourceBlock<TOutput>)_output).ReleaseReservation(messageHeader,
                target);

    private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
        Func<TInput, Task<IEnumerable<TOutput>>> transform)
    
        return (item) => Iterator(item);
        async IAsyncEnumerable<TOutput> Iterator(TInput item)
        
            foreach (var result in await transform(item)) yield return result;
        
    

    private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
        Func<TInput, IEnumerable<TOutput>> transform)
    
        return (item) => Iterator(item);
        async IAsyncEnumerable<TOutput> Iterator(TInput item)
        
            foreach (var result in transform(item)) yield return result;
            await Task.CompletedTask; // Suppress CS1998
        
    

主要困难在于EnsureOrdered 选项,因为我们不能依赖任何组合块的内置功能。选择的解决方案基于来自this 问题的想法:Dictionarylong 键,结果按原始顺序存储,long 计数器保存尚未发出的最小索引。

此实现具有所有由组合构建的自定义数据流组件的常见缺点:

    异常包含在额外的AggregateExceptions 中。这通常需要Flatten 管道中最后一个块传播的异常。 BoundedCapacity 乘以链接的内部组件的数量。 TransformManyBlockEx 的有效 BoundedCapacity 是选项中传递的值的三倍,加上存储在内部重新排序缓冲区中的结果(启用 EnsureOrdered 选项时)。这个缓冲区并没有真正的限制。不幸的是,修复这个缺陷并非易事。

【讨论】:

我将不得不摸索和测试它。可能需要一些时间。但是肯定会 +1。 @mark 假设它没有错误,它可能会作为一个临时解决方案,直到微软发布真实的东西(希望在 .NET 7 中)。 @mark 我在实现中添加了一个鲁棒性功能:如果内部 transformer 块成功完成而没有发出所有排队的结果(由于一些错误),InvalidOperationException 会通过Completion 任务。 @mark 我刚刚在实现中发现了一个错误。当多个生产者同时向TransformManyBlockEx 发送项目时,它可能会死锁。它可能与ImmediateTaskScheduler 有关。我正在看。 把它发布到github上有意义吗?在微软实施之前似乎很有用。

以上是关于如何高效地处理数百个项目中的数千个 C# 文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在简单的 TPL DataFlow 管道中优化性能?

如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?

使用 AFNetworking 同时处理数百个请求

Dash:如何更新回调中的数百个元素?

包含 SwiftyJSON 时返回数百个错误

GitHub monorepos 的精细读取访问权限