如何加快递归搜索功能?

Posted

技术标签:

【中文标题】如何加快递归搜索功能?【英文标题】:How do I speed up recursive search function? 【发布时间】:2014-08-03 23:41:21 【问题描述】:

我编写的搜索功能的速度有问题。功能步骤如下:

    函数以两个表名参数开始,一个起点和一个目标 然后该函数遍历表列组合列表(50,000 长)并检索与起始点表关联的所有组合。 然后该函数循环遍历每个检索到的组合,并且对于每个组合,它再次遍历表列组合列表,但这次查找与给定列匹配的表。 最后,函数循环遍历从上一步检索到的每个组合,并针对每个组合检查表是否与目标表相同;如果是,它会保存它,如果不是,它会调用自己,并从该组合中传入表名。

该功能的目的是能够跟踪表之间的链接,其中链接是直接的或具有多个分离度。递归级别是一个固定的整数值。

我的问题是,每当我尝试为两个级别的搜索深度运行此函数(现阶段不敢尝试更深入)时,作业都会耗尽内存,或者我失去耐心。我等了 17 分钟,然后作业内存不足一次。

每张表的平均列数为 28,标准差为 34。

这是一个图表,显示了可以在表之间建立的各种链接的示例:

这是我的代码:

private void FindLinkingTables(List<TableColumns> sourceList, TableSearchNode parentNode, string targetTable, int maxSearchDepth)

    if (parentNode.Level < maxSearchDepth)
    
        IEnumerable<string> tableColumns = sourceList.Where(x => x.Table.Equals(parentNode.Table)).Select(x => x.Column);

        foreach (string sourceColumn in tableColumns)
        
            string shortName = sourceColumn.Substring(1);

            IEnumerable<TableSearchNode> tables = sourceList.Where(
                x => x.Column.Substring(1).Equals(shortName) && !x.Table.Equals(parentNode.Table) && !parentNode.Ancenstory.Contains(x.Table)).Select(
                    x => new TableSearchNode  Table = x.Table, Column = x.Column, Level = parentNode.Level + 1 );
            foreach (TableSearchNode table in tables)
            
                parentNode.AddChildNode(sourceColumn, table);
                if (!table.Table.Equals(targetTable))
                
                    FindLinkingTables(sourceList, table, targetTable, maxSearchDepth);
                
                else
                
                    table.NotifySeachResult(true);
                
            
        
    

编辑:分离出 TableSearchNode 逻辑并添加属性和方法以确保完整性

//TableSearchNode
public Dictionary<string, List<TableSearchNode>> Children  get; private set; 

//TableSearchNode
public List<string> Ancenstory

    get
    
        Stack<string> ancestory = new Stack<string>();
        TableSearchNode ancestor = ParentNode;
        while (ancestor != null)
        
            ancestory.Push(ancestor.tbl);
            ancestor = ancestor.ParentNode;
        
        return ancestory.ToList();
    


//TableSearchNode
public void AddChildNode(string referenceColumn, TableSearchNode childNode)
    
        childNode.ParentNode = this;
        List<TableSearchNode> relatedTables = null;
        Children.TryGetValue(referenceColumn, out relatedTables);
        if (relatedTables == null)
        
            relatedTables = new List<TableSearchNode>();
            Children.Add(referenceColumn, relatedTables);
        
        relatedTables.Add(childNode);
    

提前感谢您的帮助!

【问题讨论】:

这可能会有所帮助,如果我没记错的话,尾部调用不会淹没堆栈——但我可能大错特错。 blogs.msdn.com/b/clrcodegeneration/archive/2009/05/11/…(免责声明 - 我是商科专业的) 我正在更详细地研究这一点,但有一条评论是,如果性能是一个大问题,您可能需要考虑删除所有 LINQ 调用。 @nicholas 出于兴趣,您从哪里读到过 LINQ 必然比替代方案慢? @EricScherrer 非常感谢您提供的链接。不幸的是,因为每个函数都可以在一个循环中多次调用自己,我发现我无法模仿这些示例; 然而,我将if (!table.Table.Equals(targetTable)) 更改为if (table.Table.Equals(targetTable)) 并交换了内部语句,因此递归调用是最后一个语句。我不确定这在执行时如何转化。如果你有更好的方法,请告诉我。顺便说一句,我已经在所有可能的方面都犯了可怕的错误(进程吃了 1.5gb 的 RAM),所以别担心 :) 您正在浪费时间以所有这些小方法优化代码。您将获得持续的加速,但即使是 10 倍也无济于事。您需要一种具有较低渐近成本的全新算法。 【参考方案1】:

你真的浪费了很多内存。立即想到什么:

    首先将传入的List&lt;TableColumns&gt; sourceList 替换为ILookup&lt;string, TableColumns&gt;。你应该在调用FindLinkingTables之前这样做一次:

    ILookup<string, TableColumns> sourceLookup = sourceList.ToLookup(s => s.Table);
    FindLinkingTables(sourceLookup, parentNode, targetTable, maxSearchDepth);
    

    如果不是真的需要,不要打电话给.ToList()。例如,如果您只想枚举结果列表的所有子项,则不需要它。所以你的 main 函数看起来像这样:

    private void FindLinkingTables(ILookup<string, TableColumns> sourceLookup, TableSearchNode parentNode, string targetTable, int maxSearchDepth)
    
        if (parentNode.Level < maxSearchDepth)
        
            var tableColumns = sourceLookup[parentNode.Table].Select(x => x.Column);
    
            foreach (string sourceColumn in tableColumns)
            
                string shortName = sourceColumn.Substring(1);
    
                var tables = sourceLookup
                    .Where(
                        group => !group.Key.Equals(parentNode.Table)
                                 && !parentNode.Ancenstory.Contains(group.Key))
                    .SelectMany(group => group)
                    .Where(tableColumn => tableColumn.Column.Substring(1).Equals(shortName))
                    .Select(
                        x => new TableSearchNode
                        
                            Table = x.Table,
                            Column = x.Column,
                            Level = parentNode.Level + 1
                        );
    
                foreach (TableSearchNode table in tables)
                
                    parentNode.AddChildNode(sourceColumn, table);
                    if (!table.Table.Equals(targetTable))
                    
                        FindLinkingTables(sourceLookup, table, targetTable, maxSearchDepth);
                    
                    else
                    
                        table.NotifySeachResult(true);
                    
                
            
        
    
    

    [编辑]

    另外为了加快剩余复杂的LINQ查询,你可以再准备一个ILookup

    ILookup<string, TableColumns> sourceColumnLookup = sourceLlist
            .ToLookup(t => t.Column.Substring(1));
    
    //...
    
    private void FindLinkingTables(
        ILookup<string, TableColumns> sourceLookup, 
        ILookup<string, TableColumns> sourceColumnLookup,
        TableSearchNode parentNode, string targetTable, int maxSearchDepth)
    
        if (parentNode.Level >= maxSearchDepth) return;
    
        var tableColumns = sourceLookup[parentNode.Table].Select(x => x.Column);
    
        foreach (string sourceColumn in tableColumns)
        
            string shortName = sourceColumn.Substring(1);
    
            var tables = sourceColumnLookup[shortName]
                .Where(tableColumn => !tableColumn.Table.Equals(parentNode.Table)
                                      && !parentNode.AncenstoryReversed.Contains(tableColumn.Table))
                .Select(
                    x => new TableSearchNode
                        
                            Table = x.Table,
                            Column = x.Column,
                            Level = parentNode.Level + 1
                        );
    
    
            foreach (TableSearchNode table in tables)
            
                parentNode.AddChildNode(sourceColumn, table);
                if (!table.Table.Equals(targetTable))
                
                    FindLinkingTables(sourceLookup, sourceColumnLookup, table, targetTable, maxSearchDepth);
                
                else
                
                    table.NotifySeachResult(true);
                
            
        
    
    

    我已经检查了您的 Ancestory 属性。如果IEnumerable&lt;string&gt; 足以满足您的需求,请检查此实现:

    public IEnumerable<string> AncenstoryEnum
    
        get  return AncenstoryReversed.Reverse(); 
    
    
    public IEnumerable<string> AncenstoryReversed
    
        get
        
            TableSearchNode ancestor = ParentNode;
            while (ancestor != null)
            
                yield return ancestor.tbl;
                ancestor = ancestor.ParentNode;
            
        
    
    

【讨论】:

我修改了代码以使用IEnummerable 而不是List,从而不再需要ToList()。我会尝试ILookup。谢谢。 进行两次查找是个好主意。我会试试的。 还检查Ancenstory 属性的其他可能实现。 AncenstoryReversed 枚举所有父母直到顶部,不需要临时缓冲区。但是AncenstoryEnum 当然会使用额外的存储空间(它是在LINQ Reverse 函数中创建的Array 以前从未见过产量。谢谢! 哦,我错过了您在每个递归级别上调用 Ancenstory 属性的事实......好吧,我已经更正了 3. 点下列出的代码,因此它重用了 AncenstoryReversed 属性。至少不会有额外的内存成本......但整体逻辑看起来很可疑,也许你应该考虑其他方法来组合结果(记住这里提到的所有性能建议)。【参考方案2】:

我已设法将您的 FindLinkingTables 代码重构为:

private void FindLinkingTables(
    List<TableColumns> sourceList, TableSearchNode parentNode,
    string targetTable, int maxSearchDepth)

    if (parentNode.Level < maxSearchDepth)
    
        var sames = sourceList.Where(w => w.Table == parentNode.Table);

        var query =
            from x in sames
            join y in sames
                on x.Column.Substring(1) equals y.Column.Substring(1)
            where !parentNode.Ancenstory.Contains(y.Table)
            select new TableSearchNode
            
                Table = x.Table,
                Column = x.Column,
                Level = parentNode.Level + 1
            ;

        foreach (TableSearchNode z in query)
        
            parentNode.AddChildNode(z.Column, z);
            if (z.Table != targetTable)
            
                FindLinkingTables(sourceList, z, targetTable, maxSearchDepth);
            
            else
            
                z.NotifySeachResult(true);
            
        
    

在我看来,您在查询的where !parentNode.Ancenstory.Contains(y.Table) 部分中的逻辑有缺陷。我认为你需要在这里重新考虑你的搜索操作,看看你想出了什么。

【讨论】:

我添加了更多代码来展示如何将“祖先”构建到节点中。无论如何,我会审查它,看看是否有任何问题。调试是我的朋友!【参考方案3】:

看看这个源方法,有几件事让我印象深刻:

    在您的Where 子句中,您调用parentNode.Ancenstory;这本身具有对数运行时间,然后您在它返回的List&lt;string&gt; 上调用.Contains,这是另一个对数调用(它是线性的,但列表具有对数个元素)。 您在这里所做的是检查图表中的周期。这些成本可以通过向TableColumns.Table 添加一个字段来保持不变,该字段存储有关算法如何处理Table 的信息(或者,您可以使用Dictionary&lt;Table, int&gt;,以避免向对象添加字段)。通常,在 DFS 算法中,此字段为 White、Grey 或 Black - White 表示未处理(您之前没有见过 Table),Gray 表示当前正在处理的 Table 的祖先,Black 表示当您处理完 Table 及其所有子项时。要更新您的代码以执行此操作,它看起来像:

    foreach (string sourceColumn in tableColumns)
    
        string shortName = sourceColumn.Substring(1);
    
        IEnumerable<TableSearchNode> tables =
            sourceList.Where(x => x.Column[0].Equals(shortName) &&
                                  x.Color == White)
                      .Select(x => new TableSearchNode
                                       
                                            Table = x.Table,
                                            Column = x.Column,
                                            Level = parentNode.Level + 1
                                        );
        foreach (TableSearchNode table in tables)
        
            parentNode.AddChildNode(sourceColumn, table);
    
            table.Color = Grey;
    
            if (!table.Table.Equals(targetTable))
            
                FindLinkingTables(sourceList, table, targetTable, maxSearchDepth);
            
            else
            
                table.NotifySeachResult(true);
            
    
            table.Color = Black;
        
    
    

    如上所述,您的内存不足。最简单的解决方法是删除递归调用(充当隐式堆栈)并将其替换为显式 Stack 数据结构,从而删除递归。此外,这会将递归更改为循环,而 C# 更擅长优化。

    private void FindLinkingTables(List<TableColumns> sourceList, TableSearchNode root, string targetTable, int maxSearchDepth)
    
        Stack<TableSearchNode> stack = new Stack<TableSearchNode>();
        TableSearchNode current;
    
        stack.Push(root);
    
        while (stack.Count > 0 && stack.Count < maxSearchDepth)
        
            current = stack.Pop();
    
            var tableColumns = sourceList.Where(x => x.Table.Equals(current.Table))
                                         .Select(x => x.Column);
    
            foreach (string sourceColumn in tableColumns)
            
                string shortName = sourceColumn.Substring(1);
    
                IEnumerable<TableSearchNode> tables =
                    sourceList.Where(x => x.Column[0].Equals(shortName) &&
                                          x.Color == White)
                              .Select(x => new TableSearchNode
                                               
                                                    Table = x.Table,
                                                    Column = x.Column,
                                                    Level = current.Level + 1
                                                );
                foreach (TableSearchNode table in tables)
                
                    current.AddChildNode(sourceColumn, table);
    
                    if (!table.Table.Equals(targetTable))
                    
                        table.Color = Grey;
                        stack.Push(table);
                    
                    else
                    
                        // you could go ahead and construct the ancestry list here using the stack
                        table.NotifySeachResult(true);
                        return;
                    
                
            
    
            current.Color = Black;
    
        
    
    

    最后,我们不知道Table.Equals 的成本有多大,但如果比较深入,那么可能会为您的内部循环增加大量运行时间。

【讨论】:

这里有很多可以尝试和吸收的东西。我查了DFS,因为我以前没有学过。我想进一步探索这个选项。我想我也应该开始重用节点以节省内存。让我看看我是否可以通过你的例子。谢谢。 @Sinker Wikipedia 始终是开始学习算法的好地方:en.wikipedia.org/wiki/Depth-first_search【参考方案4】:

好的,这是一个基本上放弃您发布的所有代码的答案。

首先,您应该使用您的 List&lt;TableColumns&gt; 并将它们散列成可以被索引的东西,而无需遍历整个列表。

为此,我编写了一个名为TableColumnIndexer的类:

class TableColumnIndexer

    Dictionary<string, HashSet<string>> tables = new Dictionary<string, HashSet<string>>();

    public void Add(string tableName, string columnName)
    
        this.Add(new TableColumns  Table = tableName, Column = columnName );
    

    public void Add(TableColumns tableColumns)
    
        if(! tables.ContainsKey(tableColumns.Table))
        
            tables.Add(tableColumns.Table, new HashSet<string>());
        

        tables[tableColumns.Table].Add(tableColumns.Column);
    

    // .... More code to follow

现在,一旦您将所有 Table / Column 值注入到此索引类中,您就可以调用递归方法来检索两个表之间的最短祖先链接。这里的实现有些草率,但为了清楚地说明此时的性能而编写它:

    // .... continuation of TableColumnIndexer class
    public List<string> GetShortestAncestry(string parentName, string targetName, int maxDepth)
    
        return GetSortestAncestryR(parentName, targetName, maxDepth - 1, 0, new Dictionary<string,int>());
    

    private List<string> GetSortestAncestryR(string currentName, string targetName, int maxDepth, int currentDepth, Dictionary<string, int> vistedTables)
    
        // Check if we have visited this table before
        if (!vistedTables.ContainsKey(currentName))
            vistedTables.Add(currentName, currentDepth);

        // Make sure we have not visited this table at a shallower depth before
        if (vistedTables[currentName] < currentDepth)
            return null;
        else
            vistedTables[currentName] = currentDepth;


        if (currentDepth <= maxDepth)
        
            List<string> result = new List<string>();

            // First check if the current table contains a reference to the target table
            if (tables[currentName].Contains(targetName))
            
                result.Add(currentName);
                result.Add(targetName);
                return result;
            
            // If not try to see if any of the children tables have the target table
            else
            
                List<string> bestResult = null;
                    int bestDepth = int.MaxValue;

                foreach (string childTable in tables[currentName])
                
                    var tempResult = GetSortestAncestryR(childTable, targetName, maxDepth, currentDepth + 1, vistedTables);

                    // Keep only the shortest path found to the target table
                    if (tempResult != null && tempResult.Count < bestDepth)
                    
                        bestDepth = tempResult.Count;
                        bestResult = tempResult;
                    
                

                // Take the best link we found and add it to the result list
                if (bestDepth < int.MaxValue && bestResult != null)
                
                    result.Add(currentName);
                    result.AddRange(bestResult);
                    return result;
                
                // If we did not find any result, return nothing
                else
                
                    return null;
                
            
        
        else
        
            return null;
        
    

现在所有这些代码只是最短路径算法的(有点冗长)实现,它允许源表和目标表之间的循环路径和多条路径。请注意,如果两个表之间有两条深度相同的路由,算法将只选择一个(不一定是可预测的)。

【讨论】:

以上是关于如何加快递归搜索功能?的主要内容,如果未能解决你的问题,请参考以下文章

portal商品展示功能逻辑

ecshop二次开发功能插件计划列表

Safari很卡?教你开启iOS11的隐藏功能,加快上网速度

如何使用资产库加快图像加载速度?

如何加快 Spark SQL 单元测试?

PySpark 递归键搜索