用于处理大量数据(100 万条记录及更多)的数据结构和技术
Posted
技术标签:
【中文标题】用于处理大量数据(100 万条记录及更多)的数据结构和技术【英文标题】:Data Structures & Techniques for operating on large data volumes (1 mln. recs and more) 【发布时间】:2015-01-25 13:15:25 【问题描述】:我一直在开发的一个 WPF .NET 4.5 应用程序,最初用于处理小数据量,现在可以处理超过 100 万的更大数据量,当然我开始耗尽内存。数据来自 MS SQL 数据库,数据处理需要加载到本地数据结构,因为这些数据随后由 CLR 中的代码转换/处理/引用,因此需要连续和不间断的数据访问,但并非所有数据都有立即加载到内存中,但仅在实际访问时才加载。作为一个小例子,反距离插值器使用此数据生成插值地图,所有数据都需要传递给它以生成连续网格。
我已经重写了应用程序的某些部分来处理数据,例如在任何给定时间只加载 x 数量的行,并实现一个滑动窗口方法来处理数据。但是,为应用程序的其余部分执行此操作将需要一些时间投入,我想知道是否有更强大和标准的方法来解决这个设计问题(必须有,我不是第一个)?
tldr; C# 是否提供任何数据结构或技术以中断方式访问大量数据,因此它的行为类似于 IEnumerable 但数据在实际访问或需要之前不在内存中,还是完全由我来管理内存使用?我的理想是一种结构,它可以自动实现类似缓冲区的机制,并在访问数据时加载更多数据,并从已访问且不再感兴趣的数据中释放内存。可能像一些带有内部缓冲区的 DataTable?
【问题讨论】:
也许.NET 是错误的工具?看看SSIS。另外,仅供参考,拥有任何此类工具的是 .NET,而不是 C#。 你是否规范化内存中的数据?即在您的数据结构中? 你能给个***.com/help/mcve吗? 当然,感谢@JohnSaunders 的更正。好点,但是我在映射和生成网格时使用的库是用 .NET 编写的。我认为这就是您对 SSIS 的建议的来源——停止在 .NET 中执行此操作并在 SQL 中执行?但是我在 NET 中有库,我需要将这些大数据存储到其中,SSIS 可能对此无济于事,不是吗? @Grantly,谢谢,您是在问我是否可以规范化需要访问以节省内存的数据,还是在访问时询问我的数据是否经过规范化?当我访问数据时,它已经被规范化了,我只需要将它传递给库以供使用,这样他们就可以使用这些数据进行处理。希望这能回答您的问题。 【参考方案1】:就迭代一个太大而无法放入内存的非常大的数据集而言,您可以使用生产者-消费者模型。我在处理包含数十亿条记录的自定义数据集时使用了类似的东西——总共大约 2 TB 的数据。
这个想法是有一个包含生产者和消费者的类。当您创建该类的新实例时,它会启动一个填充受约束并发队列的生产者线程。并且该线程使队列保持满。消费者部分是让您获取下一条记录的 API。
您从共享并发队列开始。为此,我喜欢 .NET BlockingCollection。
这是一个读取文本文件并维护 10,000 个文本行的队列的示例。
public class TextFileLineBuffer
private const int QueueSize = 10000;
private BlockingCollection<string> _buffer = new BlockingCollection<string>(QueueSize);
private CancellationTokenSource _cancelToken;
private StreamReader reader;
public TextFileLineBuffer(string filename)
// File is opened here so that any exception is thrown on the calling thread.
_reader = new StreamReader(filename);
_cancelToken = new CancellationTokenSource();
// start task that reads the file
Task.Factory.StartNew(ProcessFile, TaskCreationOptions.LongRunning);
public string GetNextLine()
if (_buffer.IsCompleted)
// The buffer is empty because the file has been read
// and all lines returned.
// You can either call this an error and throw an exception,
// or you can return null.
return null;
// If there is a record in the buffer, it is returned immediately.
// Otherwise, Take does a non-busy wait.
// You might want to catch the OperationCancelledException here and return null
// rather than letting the exception escape.
return _buffer.Take(_cancelToken.Token);
private void ProcessFile()
while (!_reader.EndOfStream && !_cancelToken.Token.IsCancellationRequested)
var line = _reader.ReadLine();
try
// This will block if the buffer already contains QueueSize records.
// As soon as a space becomes available, this will add the record
// to the buffer.
_buffer.Add(line, _cancelToken.Token);
catch (OperationCancelledException)
;
_buffer.CompleteAdding();
public void Cancel()
_cancelToken.Cancel();
这就是它的基本原理。您需要添加一个 Dispose 方法,以确保线程终止并关闭文件。
我已经在许多不同的程序中使用了这种基本方法,效果很好。您必须进行一些分析和测试以确定您的应用程序的最佳缓冲区大小。您需要足够大的东西来跟上正常的数据流并处理突发的活动,但又不能大到超出您的内存预算。
IEnumerable 修改
如果你想支持IEnumerable<T>
,你必须做一些小的修改。我将扩展我的示例以支持IEnumerable<String>
。
首先,您必须更改类声明:
public class TextFileLineBuffer: IEnumerable<string>
那么,你必须实现GetEnumerator:
public IEnumerator<String> GetEnumerator()
foreach (var s in _buffer.GetConsumingEnumerable())
yield return s;
IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
这样,您可以初始化事物,然后将其传递给任何需要IEnumerable<string>
的代码。于是就变成了:
var items = new TextFileLineBuffer(filename);
DoSomething(items);
void DoSomething(IEnumerable<string> list)
foreach (var s in list)
Console.WriteLine(s);
【讨论】:
感谢您解释生产者和消费者模型并分享工作代码示例。干净整洁。但是,是否可以扩展消费者部分,使其无需显式调用 GetNextLine 方法即可工作?我的消费者(方法调用)需要一个 IEnumerable 并且我不能修改方法的内部结构,因此不能调用 GetNextLine() 除非代表我调用它,例如覆盖消费者方法无论如何都会调用的方法,例如在自定义 IEnumerable 上使用自定义 IEnumerator(并覆盖其 MoveNext())?问候! @Sergey:支持IEnumerable<T>
所需的修改非常简单。查看我的更新。
非常感谢@Jim,从我能读到的内容来看,这个实现应该工作得很好,当我下周回到这个问题时,我会给它一个实际的旋转。这种数据访问模型的扩展性很好,例如,我可以在带有 EF 的大型数据库上使用它。当两个独立的“东西”放在一起时,看看一个人能想出什么是非常有趣的!【参考方案2】:
@Sergey 生产者-消费者模型可能是您最安全的解决方案(由 Jim Mischel 提出),以实现完全的可扩展性。
但是,如果您要为大象增加空间(使用非常适合的视觉隐喻),那么动态压缩是一个可行的选择。使用时解压缩,使用后丢弃,将核心数据结构压缩在内存中。显然它取决于数据——它有多少适合压缩,但在大多数数据结构中都有大量的空间。如果您对某些元数据有 ON 和 OFF 标志,这可以隐藏在 16/32 位数的未使用位中,或者至少保存在位而不是字节中;使用 16 位整数表示 lat / longs,并在使用前将每个整数转换为实数;可以使用 winzip 类型库压缩字符串 - 或编制索引,以便只保存一份副本,并且内存中不存在重复项等......
动态减压(尽管是定制的)可以闪电般快速。
我承认,整个过程可能非常费力,但随着大象的成长,绝对可以让房间保持足够大 - 在 某些 实例中。 (当然,如果数据只是无限增长,可能永远都不够好)
编辑:关于任何来源... 嗨@Sergey,我希望我能!真的!我已经使用这种技术进行数据压缩,实际上整个事情都是在白板上定制设计的,涉及一两个编码器。 它当然不是(全部)火箭科学,但它很好地全面了解所有数据的性质,然后您知道(例如)某个数字永远不会超过 9999,那么您可以选择如何将其存储在最低限度位,然后将剩余的位(假设为 32 位存储)分配给其他值。 (一个现实世界的例子是一个人的手指数量......粗略地说,你可以将上限设置为 8 或 10,虽然 12 是可能的,甚至 20 是远程可行的,等等,如果他们有额外的手指。你可以明白我的意思)Lat / Longs 是永远不会跨越逻辑边界的数字的完美示例(除非您使用环绕值......)。也就是说,它们总是在 -90 和 +90 之间(只是猜测是哪种类型的经纬度)——这很容易减少/转换,因为值的范围非常简洁。
因此,我们没有“直接”依赖任何第三方文献。仅适用于为特定类型的数据设计的算法。
在其他项目中,对于快速实时 DSP(处理),更聪明的(经验丰富的游戏程序员)编码人员会将浮点数转换为 16 位整数,并计算出全局缩放因子以提供特定数据流的最大精度(加速度计、LVDT ,压力表等)您正在收集。
这减少了传输和存储的数据,而不会丢失任何信息。同样,对于实时波/信号数据,您可以使用(快速)傅立叶变换将噪声波转换为其幅度、相位和频谱分量——实际上是数据值的一半,而不会实际丢失任何(重要)数据。 (在这些算法中,数据“丢失”是完全可测量的——因此您可以确定您是否确实丢失了数据)
同样有诸如降雨分析之类的算法(与降雨无关,更多关于周期和频率)会大量减少您的数据。峰值检测和矢量分析对于其他一些信号就足够了,基本上会丢弃大约 99% 的数据……这个列表是无穷无尽的,但该技术必须非常适合您的数据。而且您可能有许多不同类型的数据,每种数据都适用于不同的“减少”技术。我相信你可以谷歌“无损数据减少”(虽然我认为无损这个词是由音乐处理创造的,并且有点误导,因为数字音乐已经失去了上限和下限频率范围......我离题了)......请发布您发现的内容(当然,如果您有时间/有兴趣进一步研究)
我有兴趣讨论您的元数据,也许可以相当优雅地“减少”一大块......
【讨论】:
嗨@Grantly,非常感谢您对我的问题的贡献。是的,生产者-消费者的规模非常好,这可能是目前最重要的目标。在您建议的级别上,我没有过多考虑压缩我的数据模型,因为就像您说的那样,这可能很费力,并且我需要在之后进行更多测试以确保数据在减压后是正确的,因为我正在做模拟所以准确性很重要。您能否包含一个或两个资源链接,以供将来阅读,我可以在其中了解有关此类数据压缩和代码示例的更多信息?问候 @Sergey - 已在我的答案中添加了 EDIT,希望对您有所帮助以上是关于用于处理大量数据(100 万条记录及更多)的数据结构和技术的主要内容,如果未能解决你的问题,请参考以下文章
处理具有数百万条记录更新和大量读数的 MySQL 表的最佳方法