如何处理大型但不是大数据的数据集?
Posted
技术标签:
【中文标题】如何处理大型但不是大数据的数据集?【英文标题】:How to handle large yet not big-data datasets? 【发布时间】:2020-05-28 02:33:24 【问题描述】:我有一个约 200gb 的数据集,包含大约 15 亿个观测值,我需要在其上运行一些条件分析和数据聚合*。
问题是我不习惯(也没有受过处理)大型数据集。我通常在 R 或 Python 上工作(旁边有一些 Julia),当我无法将数据集放入内存时,我完全迷失了。
人们如何处理这些适合磁盘但不适合内存的数据集?我应该从哪里开始寻找解决方案?是否有一个地方集中了大型但非大数据数据集的信息?
*长话短说,我有另一个数据集(适合内存),对于这个小数据集的每一行,我想计算大数据集中与小数据集的某些条件匹配的观察数。我最初的反应是分块运行代码,但这非常低效,并且需要几个世纪的单处理器计算时间。
既然已经特别问过了,那我就描述一下我的文件结构吧。
我有一个大文件,我们称之为 BIG,有(特别是)两个 ID 变量,$ID0$ 和 $ID1$ 和一个日期变量 $date1$。
我有一个小文件,我们称之为 SMALL,有两个 ID 变量,$ID2$ 和 $ID3 $ 和一个日期变量 $date2$。
对于每个 $ID2_i$,我想计算所有观察结果
$\ID0 = ID2_i, date1
【问题讨论】:
您需要提供文件的具体结构以及要从中提取哪些信息。想想你真正需要什么信息,不要只是把原始文件放到你的内存中。例如,您可能不需要确切的值,但可以直接将该值填充到直方图中。我敢打赌,你的问题只需要几 mb 的 RAM 就可以解决! 获取更多 RAM,使用随机采样,分段处理数据,使用近似值... 是的,我可以使用 1mb 的内存来解决这个问题。主要问题是我需要在我的小数据集的每一行运行条件计数一次,这意味着 160 万次,当每个计数超过 15 亿次观察时,这非常长。我也无法近似,因为我需要确切的计数。我不是在为我的问题寻求答案,因为它很长且无趣,但主要是在线资源或良好实践的方向。 如果是“大数据”,你会如何运行它? 如果没有考虑到应用程序,数据对于统计学家来说并不重要。回归、绘图等都有方便的分块方法,可能是一个主题问题。一般来说,只能说获得 SAS 许可证。 【参考方案1】:我可能误解了您的问题,但是在我看来,将大文件分块(如 cmets 中所建议的那样)似乎是最直接的方法。
假设您将 200 GB 的文件分成 100 个块,然后遍历这些块并为每个块进行所需的计数,然后汇总结果。如果每块操作在几分钟内运行,除非您想一遍又一遍地执行此操作,否则应该没问题。
要获得更具体的建议,我需要更多地了解数据存储格式。我们在谈论一个大的.csv
文件吗?在这种情况下,对于 R,您可能会查看 chunked API of the readr
package。为了在 R 中尽快再次进行计数,data.table
包可能会派上用场。
编辑:添加一些示例代码
这不会完全按照您的要求进行,但希望涵盖一些关键点,以便按照我的建议解决问题。
library(data.table)
library(readr)
ids <- seq.int(1, 1e2)
dates <- seq(as.Date("1999/01/01"), as.Date("2000/01/01"), by = "day")
big <- data.table(id0 = sample(ids, 1e6, replace = TRUE),
id1 = sample(ids, 1e6, replace = TRUE),
date1 = sample(dates, 1e6, replace = TRUE))
write.csv(big, "big.csv", row.names = FALSE)
small <- data.table(id2 = sample(ids, 1e2),
id3 = sample(ids, 1e2),
date2 = sample(dates, 1e2))
count_fun <- function(x, pos, acc)
setDT(x)
tmp <- small[x, list(counts = .N),
on = c("id2 == id0", "id3 == id1", "date2 > date1"),
by = .EACHI, nomatch = NULL]
acc[tmp$id2] <- acc[tmp$id2] + tmp$counts
acc
accumulator <- AccumulateCallback$new(count_fun, acc = rep(0, length(ids)))
counts <- read_csv_chunked("big.csv", accumulator, chunk_size = 1e4)
【讨论】:
是的,它是 csv,但我可以得到另一种格式,这不是问题。 我用 R 中的一些示例代码更新了我的问题。如果您有任何问题,请随时提问。【参考方案2】:有不同的方法
将数据集分块(在未来节省时间,但需要初始时间投入)
分块可以让您简化许多操作,例如洗牌等。
确保每个子集/块都代表整个数据集。每个块文件应该有相同数量的行。
这可以通过将一行追加到一个又一个文件来完成。很快,您就会意识到打开每个文件并写一行是低效的。尤其是在同一个驱动器上读写时。 -> 添加适合内存的写入和读取缓冲区。
选择适合您需要的块大小。我选择这个特定的大小是因为我的默认文本编辑器仍然可以相当快地打开它。
较小的块可以提高性能,特别是如果您想获得类分布等指标,因为您只需遍历一个代表性文件即可获得对整个数据集的估计,这可能就足够了。 较大的块文件确实可以更好地表示每个文件中的整个数据集,但您也可以只浏览 x 个较小的块文件。
我确实为此使用了 c#,因为我在那里更有经验,因此我可以使用完整的功能集,例如将任务 reading / processing / writing
拆分到不同的线程。
如果您有使用 python 或 r 的经验,我怀疑应该也有类似的功能。在如此大的数据集上,并行化可能是一个重要因素。
可以将分块数据集建模为一个交错数据集,您可以使用张量处理单元对其进行处理。这可能会产生最好的性能之一,并且可以在本地以及真正大型机器上的云中执行。但这需要对 tensorflow 进行大量学习。
使用阅读器,逐步阅读文件
您不想做all_of_it = file.read()
之类的事情,而是想使用某种流阅读器。以下函数逐行读取其中一个块文件(或整个 300gb 数据集)以计算文件中的每个类。通过一次处理一行,您的程序不会溢出内存。
您可能想要添加一些进度指示,例如 X 行/秒或 X MBbs,以便估算总处理时间。
def getClassDistribution(path):
classes = dict()
# open sample file and count classes
with open(path, "r",encoding="utf-8",errors='ignore') as f:
line = f.readline()
while line:
if line != '':
labelstring = line[-2:-1]
if labelstring == ',':
labelstring = line[-1:]
label = int(labelstring)
if label in classes:
classes[label] += 1
else:
classes[label] = 1
line = f.readline()
return classes
我使用分块数据集和估计的组合。
性能缺陷
cells = int(line.Split(',')[8])
等预制函数,这将很快导致内存吞吐量瓶颈。一个恰当的例子可以在getClassDistribution
找到,我只想得到标签。
以下 C# 函数将 csv 行快速拆分为元素。
// Call function
ThreadPool.QueueUserWorkItem((c) => AnalyzeLine("05.02.2020,12.20,10.13").Wait());
// Parralelize this on multiple cores/threads for ultimate performance
private async Task AnalyzeLine(string line)
PriceElement elementToAdd = new PriceElement();
int counter = 0;
string temp = "";
foreach (char c in line)
if (c == ',')
switch (counter)
case 0:
elementToAdd.spotTime = DateTime.Parse(temp, CultureInfo.InvariantCulture);
break;
case 1:
elementToAdd.buyPrice = decimal.Parse(temp);
break;
case 2:
elementToAdd.sellPrice = decimal.Parse(temp);
break;
temp = "";
counter++;
else temp += c;
// compare the price element to conditions on another thread
Observate(elementToAdd);
创建数据库并加载数据
在处理类似 csv 的数据时,您可以将数据加载到数据库中。 数据库是为容纳大量数据而设计的,您可以期待非常高的性能。 与原始数据相比,数据库可能会占用更多磁盘空间。这就是我不再使用数据库的原因之一。
硬件优化
如果您的代码优化得当,您的瓶颈很可能是硬盘吞吐量。
如果数据适合您的本地硬盘驱动器,请在本地使用它,因为这样可以消除网络延迟(假设本地网络中的每条记录需要 2-5 毫秒,远程位置需要 10-100 毫秒)。 使用现代硬盘。一个 1tb NVME SSD 今天的成本约为 130(intel 600p 1tb)。 nvme ssd 使用 pcie,比普通 ssd 快 5 倍,比普通硬盘快 50 倍,尤其是在快速写入不同位置时(分块数据)。近年来,SSD 的容量已经大大增加,对于这样的任务来说,这将是野蛮的。以下屏幕截图提供了在同一台机器上使用相同数据进行 tensorflow 训练的性能比较。一次只保存在本地标准 ssd 上,一次保存在本地网络中的网络附加存储(普通硬盘)上。
【讨论】:
【参考方案3】:看起来像一个 O(n^2) 问题:BIG 中的每个元素都必须与 BIG 中的所有其他元素进行比较。
也许您可以将内存中所需的所有字段都放在比较中(其余的留在文件中)。 例如:1.5G 观测 x 1 个日期(4 个字节)x 2 个 ID(8 个字节)可以放入 18GB。
也许您可以按日期对 BIG 进行排序,然后您的问题变成 O(n x log(n))。
也许您可以将 BIG 拆分为 ID3i = ID3j 的块。
有很多可能性。
【讨论】:
对于许多这样的连接,可以通过适当的索引将 $O(n^2)$ 性能降低到 $O(n\log(n))$。这是强大的数据库平台提供的服务之一。以上是关于如何处理大型但不是大数据的数据集?的主要内容,如果未能解决你的问题,请参考以下文章