如何加快递归搜索功能?
Posted
技术标签:
【中文标题】如何加快递归搜索功能?【英文标题】:How do I speed up recursive search function? 【发布时间】:2014-08-03 23:41:21 【问题描述】:我编写的搜索功能的速度有问题。功能步骤如下:
-
函数以两个表名参数开始,一个起点和一个目标
然后该函数遍历表列组合列表(50,000 长)并检索与起始点表关联的所有组合。
然后该函数循环遍历每个检索到的组合,并且对于每个组合,它再次遍历表列组合列表,但这次查找与给定列匹配的表。
最后,函数循环遍历从上一步检索到的每个组合,并针对每个组合检查表是否与目标表相同;如果是,它会保存它,如果不是,它会调用自己,并从该组合中传入表名。
该功能的目的是能够跟踪表之间的链接,其中链接是直接的或具有多个分离度。递归级别是一个固定的整数值。
我的问题是,每当我尝试为两个级别的搜索深度运行此函数(现阶段不敢尝试更深入)时,作业都会耗尽内存,或者我失去耐心。我等了 17 分钟,然后作业内存不足一次。
每张表的平均列数为 28,标准差为 34。
这是一个图表,显示了可以在表之间建立的各种链接的示例:
这是我的代码:
private void FindLinkingTables(List<TableColumns> sourceList, TableSearchNode parentNode, string targetTable, int maxSearchDepth)
if (parentNode.Level < maxSearchDepth)
IEnumerable<string> tableColumns = sourceList.Where(x => x.Table.Equals(parentNode.Table)).Select(x => x.Column);
foreach (string sourceColumn in tableColumns)
string shortName = sourceColumn.Substring(1);
IEnumerable<TableSearchNode> tables = sourceList.Where(
x => x.Column.Substring(1).Equals(shortName) && !x.Table.Equals(parentNode.Table) && !parentNode.Ancenstory.Contains(x.Table)).Select(
x => new TableSearchNode Table = x.Table, Column = x.Column, Level = parentNode.Level + 1 );
foreach (TableSearchNode table in tables)
parentNode.AddChildNode(sourceColumn, table);
if (!table.Table.Equals(targetTable))
FindLinkingTables(sourceList, table, targetTable, maxSearchDepth);
else
table.NotifySeachResult(true);
编辑:分离出 TableSearchNode 逻辑并添加属性和方法以确保完整性
//TableSearchNode
public Dictionary<string, List<TableSearchNode>> Children get; private set;
//TableSearchNode
public List<string> Ancenstory
get
Stack<string> ancestory = new Stack<string>();
TableSearchNode ancestor = ParentNode;
while (ancestor != null)
ancestory.Push(ancestor.tbl);
ancestor = ancestor.ParentNode;
return ancestory.ToList();
//TableSearchNode
public void AddChildNode(string referenceColumn, TableSearchNode childNode)
childNode.ParentNode = this;
List<TableSearchNode> relatedTables = null;
Children.TryGetValue(referenceColumn, out relatedTables);
if (relatedTables == null)
relatedTables = new List<TableSearchNode>();
Children.Add(referenceColumn, relatedTables);
relatedTables.Add(childNode);
提前感谢您的帮助!
【问题讨论】:
这可能会有所帮助,如果我没记错的话,尾部调用不会淹没堆栈——但我可能大错特错。 blogs.msdn.com/b/clrcodegeneration/archive/2009/05/11/…(免责声明 - 我是商科专业的) 我正在更详细地研究这一点,但有一条评论是,如果性能是一个大问题,您可能需要考虑删除所有 LINQ 调用。 @nicholas 出于兴趣,您从哪里读到过 LINQ 必然比替代方案慢? @EricScherrer 非常感谢您提供的链接。不幸的是,因为每个函数都可以在一个循环中多次调用自己,我发现我无法模仿这些示例; 然而,我将if (!table.Table.Equals(targetTable))
更改为if (table.Table.Equals(targetTable))
并交换了内部语句,因此递归调用是最后一个语句。我不确定这在执行时如何转化。如果你有更好的方法,请告诉我。顺便说一句,我已经在所有可能的方面都犯了可怕的错误(进程吃了 1.5gb 的 RAM),所以别担心 :)
您正在浪费时间以所有这些小方法优化代码。您将获得持续的加速,但即使是 10 倍也无济于事。您需要一种具有较低渐近成本的全新算法。
【参考方案1】:
你真的浪费了很多内存。立即想到什么:
首先将传入的List<TableColumns> sourceList
替换为ILookup<string, TableColumns>
。你应该在调用FindLinkingTables
之前这样做一次:
ILookup<string, TableColumns> sourceLookup = sourceList.ToLookup(s => s.Table);
FindLinkingTables(sourceLookup, parentNode, targetTable, maxSearchDepth);
如果不是真的需要,不要打电话给.ToList()
。例如,如果您只想枚举结果列表的所有子项,则不需要它。所以你的 main 函数看起来像这样:
private void FindLinkingTables(ILookup<string, TableColumns> sourceLookup, TableSearchNode parentNode, string targetTable, int maxSearchDepth)
if (parentNode.Level < maxSearchDepth)
var tableColumns = sourceLookup[parentNode.Table].Select(x => x.Column);
foreach (string sourceColumn in tableColumns)
string shortName = sourceColumn.Substring(1);
var tables = sourceLookup
.Where(
group => !group.Key.Equals(parentNode.Table)
&& !parentNode.Ancenstory.Contains(group.Key))
.SelectMany(group => group)
.Where(tableColumn => tableColumn.Column.Substring(1).Equals(shortName))
.Select(
x => new TableSearchNode
Table = x.Table,
Column = x.Column,
Level = parentNode.Level + 1
);
foreach (TableSearchNode table in tables)
parentNode.AddChildNode(sourceColumn, table);
if (!table.Table.Equals(targetTable))
FindLinkingTables(sourceLookup, table, targetTable, maxSearchDepth);
else
table.NotifySeachResult(true);
[编辑]
另外为了加快剩余复杂的LINQ查询,你可以再准备一个ILookup
:
ILookup<string, TableColumns> sourceColumnLookup = sourceLlist
.ToLookup(t => t.Column.Substring(1));
//...
private void FindLinkingTables(
ILookup<string, TableColumns> sourceLookup,
ILookup<string, TableColumns> sourceColumnLookup,
TableSearchNode parentNode, string targetTable, int maxSearchDepth)
if (parentNode.Level >= maxSearchDepth) return;
var tableColumns = sourceLookup[parentNode.Table].Select(x => x.Column);
foreach (string sourceColumn in tableColumns)
string shortName = sourceColumn.Substring(1);
var tables = sourceColumnLookup[shortName]
.Where(tableColumn => !tableColumn.Table.Equals(parentNode.Table)
&& !parentNode.AncenstoryReversed.Contains(tableColumn.Table))
.Select(
x => new TableSearchNode
Table = x.Table,
Column = x.Column,
Level = parentNode.Level + 1
);
foreach (TableSearchNode table in tables)
parentNode.AddChildNode(sourceColumn, table);
if (!table.Table.Equals(targetTable))
FindLinkingTables(sourceLookup, sourceColumnLookup, table, targetTable, maxSearchDepth);
else
table.NotifySeachResult(true);
我已经检查了您的 Ancestory
属性。如果IEnumerable<string>
足以满足您的需求,请检查此实现:
public IEnumerable<string> AncenstoryEnum
get return AncenstoryReversed.Reverse();
public IEnumerable<string> AncenstoryReversed
get
TableSearchNode ancestor = ParentNode;
while (ancestor != null)
yield return ancestor.tbl;
ancestor = ancestor.ParentNode;
【讨论】:
我修改了代码以使用IEnummerable
而不是List
,从而不再需要ToList()
。我会尝试ILookup
。谢谢。
进行两次查找是个好主意。我会试试的。
还检查Ancenstory
属性的其他可能实现。 AncenstoryReversed
枚举所有父母直到顶部,不需要临时缓冲区。但是AncenstoryEnum
当然会使用额外的存储空间(它是在LINQ
Reverse
函数中创建的Array
)
以前从未见过产量。谢谢!
哦,我错过了您在每个递归级别上调用 Ancenstory 属性的事实......好吧,我已经更正了 3. 点下列出的代码,因此它重用了 AncenstoryReversed 属性。至少不会有额外的内存成本......但整体逻辑看起来很可疑,也许你应该考虑其他方法来组合结果(记住这里提到的所有性能建议)。【参考方案2】:
我已设法将您的 FindLinkingTables
代码重构为:
private void FindLinkingTables(
List<TableColumns> sourceList, TableSearchNode parentNode,
string targetTable, int maxSearchDepth)
if (parentNode.Level < maxSearchDepth)
var sames = sourceList.Where(w => w.Table == parentNode.Table);
var query =
from x in sames
join y in sames
on x.Column.Substring(1) equals y.Column.Substring(1)
where !parentNode.Ancenstory.Contains(y.Table)
select new TableSearchNode
Table = x.Table,
Column = x.Column,
Level = parentNode.Level + 1
;
foreach (TableSearchNode z in query)
parentNode.AddChildNode(z.Column, z);
if (z.Table != targetTable)
FindLinkingTables(sourceList, z, targetTable, maxSearchDepth);
else
z.NotifySeachResult(true);
在我看来,您在查询的where !parentNode.Ancenstory.Contains(y.Table)
部分中的逻辑有缺陷。我认为你需要在这里重新考虑你的搜索操作,看看你想出了什么。
【讨论】:
我添加了更多代码来展示如何将“祖先”构建到节点中。无论如何,我会审查它,看看是否有任何问题。调试是我的朋友!【参考方案3】:看看这个源方法,有几件事让我印象深刻:
在您的Where
子句中,您调用parentNode.Ancenstory
;这本身具有对数运行时间,然后您在它返回的List<string>
上调用.Contains
,这是另一个对数调用(它是线性的,但列表具有对数个元素)。
您在这里所做的是检查图表中的周期。这些成本可以通过向TableColumns.Table
添加一个字段来保持不变,该字段存储有关算法如何处理Table
的信息(或者,您可以使用Dictionary<Table, int>
,以避免向对象添加字段)。通常,在 DFS 算法中,此字段为 White、Grey 或 Black - White 表示未处理(您之前没有见过 Table
),Gray 表示当前正在处理的 Table
的祖先,Black 表示当您处理完 Table
及其所有子项时。要更新您的代码以执行此操作,它看起来像:
foreach (string sourceColumn in tableColumns)
string shortName = sourceColumn.Substring(1);
IEnumerable<TableSearchNode> tables =
sourceList.Where(x => x.Column[0].Equals(shortName) &&
x.Color == White)
.Select(x => new TableSearchNode
Table = x.Table,
Column = x.Column,
Level = parentNode.Level + 1
);
foreach (TableSearchNode table in tables)
parentNode.AddChildNode(sourceColumn, table);
table.Color = Grey;
if (!table.Table.Equals(targetTable))
FindLinkingTables(sourceList, table, targetTable, maxSearchDepth);
else
table.NotifySeachResult(true);
table.Color = Black;
如上所述,您的内存不足。最简单的解决方法是删除递归调用(充当隐式堆栈)并将其替换为显式 Stack
数据结构,从而删除递归。此外,这会将递归更改为循环,而 C# 更擅长优化。
private void FindLinkingTables(List<TableColumns> sourceList, TableSearchNode root, string targetTable, int maxSearchDepth)
Stack<TableSearchNode> stack = new Stack<TableSearchNode>();
TableSearchNode current;
stack.Push(root);
while (stack.Count > 0 && stack.Count < maxSearchDepth)
current = stack.Pop();
var tableColumns = sourceList.Where(x => x.Table.Equals(current.Table))
.Select(x => x.Column);
foreach (string sourceColumn in tableColumns)
string shortName = sourceColumn.Substring(1);
IEnumerable<TableSearchNode> tables =
sourceList.Where(x => x.Column[0].Equals(shortName) &&
x.Color == White)
.Select(x => new TableSearchNode
Table = x.Table,
Column = x.Column,
Level = current.Level + 1
);
foreach (TableSearchNode table in tables)
current.AddChildNode(sourceColumn, table);
if (!table.Table.Equals(targetTable))
table.Color = Grey;
stack.Push(table);
else
// you could go ahead and construct the ancestry list here using the stack
table.NotifySeachResult(true);
return;
current.Color = Black;
最后,我们不知道Table.Equals
的成本有多大,但如果比较深入,那么可能会为您的内部循环增加大量运行时间。
【讨论】:
这里有很多可以尝试和吸收的东西。我查了DFS,因为我以前没有学过。我想进一步探索这个选项。我想我也应该开始重用节点以节省内存。让我看看我是否可以通过你的例子。谢谢。 @Sinker Wikipedia 始终是开始学习算法的好地方:en.wikipedia.org/wiki/Depth-first_search【参考方案4】:好的,这是一个基本上放弃您发布的所有代码的答案。
首先,您应该使用您的 List<TableColumns>
并将它们散列成可以被索引的东西,而无需遍历整个列表。
为此,我编写了一个名为TableColumnIndexer
的类:
class TableColumnIndexer
Dictionary<string, HashSet<string>> tables = new Dictionary<string, HashSet<string>>();
public void Add(string tableName, string columnName)
this.Add(new TableColumns Table = tableName, Column = columnName );
public void Add(TableColumns tableColumns)
if(! tables.ContainsKey(tableColumns.Table))
tables.Add(tableColumns.Table, new HashSet<string>());
tables[tableColumns.Table].Add(tableColumns.Column);
// .... More code to follow
现在,一旦您将所有 Table / Column 值注入到此索引类中,您就可以调用递归方法来检索两个表之间的最短祖先链接。这里的实现有些草率,但为了清楚地说明此时的性能而编写它:
// .... continuation of TableColumnIndexer class
public List<string> GetShortestAncestry(string parentName, string targetName, int maxDepth)
return GetSortestAncestryR(parentName, targetName, maxDepth - 1, 0, new Dictionary<string,int>());
private List<string> GetSortestAncestryR(string currentName, string targetName, int maxDepth, int currentDepth, Dictionary<string, int> vistedTables)
// Check if we have visited this table before
if (!vistedTables.ContainsKey(currentName))
vistedTables.Add(currentName, currentDepth);
// Make sure we have not visited this table at a shallower depth before
if (vistedTables[currentName] < currentDepth)
return null;
else
vistedTables[currentName] = currentDepth;
if (currentDepth <= maxDepth)
List<string> result = new List<string>();
// First check if the current table contains a reference to the target table
if (tables[currentName].Contains(targetName))
result.Add(currentName);
result.Add(targetName);
return result;
// If not try to see if any of the children tables have the target table
else
List<string> bestResult = null;
int bestDepth = int.MaxValue;
foreach (string childTable in tables[currentName])
var tempResult = GetSortestAncestryR(childTable, targetName, maxDepth, currentDepth + 1, vistedTables);
// Keep only the shortest path found to the target table
if (tempResult != null && tempResult.Count < bestDepth)
bestDepth = tempResult.Count;
bestResult = tempResult;
// Take the best link we found and add it to the result list
if (bestDepth < int.MaxValue && bestResult != null)
result.Add(currentName);
result.AddRange(bestResult);
return result;
// If we did not find any result, return nothing
else
return null;
else
return null;
现在所有这些代码只是最短路径算法的(有点冗长)实现,它允许源表和目标表之间的循环路径和多条路径。请注意,如果两个表之间有两条深度相同的路由,算法将只选择一个(不一定是可预测的)。
【讨论】:
以上是关于如何加快递归搜索功能?的主要内容,如果未能解决你的问题,请参考以下文章