如何在 Scala 中处理大列表?

Posted

技术标签:

【中文标题】如何在 Scala 中处理大列表?【英文标题】:How to process a large list in Scala? 【发布时间】:2014-12-30 22:20:29 【问题描述】:

我曾经用 Python 处理统计数据。比如一个大文件包含几千万个id:

$ cat report_ids | head
3788065
7950319
140494477
182851142
120757318
160033281
78087029
42591118
104363873
212143796
...

在 IPython 中,以下几行始终运行良好。

In [1]: lines = [line.strip() for  line in open('./report_ids').readlines()]

In [2]: from collections import Counter

In [3]: d = Counter(lines)

In [4]: d[lines[0]]
Out[4]: 9

当我在 Scala 中尝试相同的操作时,会发生内存不足错误。

val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
lines: List[String] = List(3788065, 7950319, 140494477, 182851142, 120757318, 160033281, 78087029, 42591118, 104363873, 212143796, 175644298, 112703123, 213308679, 109649718, 11947300, 214660563, 83402867, 162877289, 83030111, 78231639, 45129180, 11635655, 34778452, 46604760, 142519099, 213261965, 137812002, 167057636, 119258917, 212722777, 177979907, 13754217, 156769524, 40682536, 202195379, 91879046, 22766751, 6656279, 11972540, 76929862, 91616020, 110579570, 143849021, 27239477, 65146692, 142968764, 153891284, 182405787, 153038108, 50714639, 113386401, 96657813, 75908413, 32215626, 175000692, 154337083, 113754207, 165109267, 3788065, 42285876, 171004203, 109802388, 92956305, 46690091, 103638776, 15141632, 110579570, 120984867, 183167775, 86841540, 60465849, 27239477, 91760184, 213464...

scala> val g = lines.groupBy(e => e).mapValues(x => x.length)
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
    at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:326)
    at scala.collection.immutable.HashMap.$plus(HashMap.scala:57)
    at scala.collection.immutable.HashMap.$plus(HashMap.scala:36)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24)
    at scala.collection.TraversableLike$$anonfun$groupBy$3.apply(TraversableLike.scala:334)
    at scala.collection.TraversableLike$$anonfun$groupBy$3.apply(TraversableLike.scala:333)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:333)
    at scala.collection.AbstractTraversable.groupBy(Traversable.scala:105)
    at .<init>(<console>:8)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
    at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)

然后我尝试了Scala中的lazy方法,还是不行。

scala> lazy val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
lines: List[String] = <lazy>

scala> val g = lines.groupBy(e => e).mapValues(x => x.length)
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.io.BufferedReader.readLine(BufferedReader.java:349)
    at java.io.BufferedReader.readLine(BufferedReader.java:382)
    at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
    at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
    at .lines$lzycompute(<console>:7)
    at .lines(<console>:7)
    at .<init>(<console>:8)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
    at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
    at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)

那么我怎样才能像在 Python 中那样在 Scala 中完成小组工作呢?谢谢。

【问题讨论】:

【参考方案1】:

我认为在 Python 和 Scala 中做一些完全不同的事情。让我们看看第一行:

lines = [line.strip() for  line in open('./report_ids').readlines()]

在我看来,您在这里使用的是可迭代对象(以 Scala 术语),而不是真正的列表。我可能是错的——我使用 Python 工作的频率不足以记住——但让我们假设你是这样,看看你如何在 Scala 中获得同样的东西。你有这个:

val lines = scala.io.Source.fromFile("./report_ids").getLines.toList

现在,Scala 在标准库中没有可迭代的文件,尽管我认为 Scala I/O 自带了一个。在这里,您可以这样做以获取迭代器(不是同一件事):

val lines = scala.io.Source.fromFile("./report_ids").getLines

只是不要把它变成List。 :) 现在,因为这是一个迭代器,而不是一个可迭代的,所以一旦你使用它两次它就会失败。所以让我们这样写吧:

def lines = scala.io.Source.fromFile("./report_ids").getLines

现在您可以多次使用“行”。遗憾的是,您会泄漏文件描述符——对于更严格的 I/O 处理,请查看更严格的 I/O 库,例如 Scalaz Stream 或 Scala I/O。或者使用贷款模式。

接下来,将Counter 代码替换为:

val g = lines.groupBy(e => e).mapValues(x => x.length)

这将是内存密集型的。这样的事情应该会好很多:

val g = scala.collection.mutable.HashMap.empty[String, Int] withDefaultValue 0
for (line <- lines) g(line) += 1

由于linesIterator,您将无法执行lines(0)。对于第一行,您可以只使用lines.next,也可以使用lines.toStream(0) 来访问索引,而无需将整个文件读入内存。

【讨论】:

以上是关于如何在 Scala 中处理大列表?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala 中查找列表的引用

如何在Scala中展平不同类型的列表?

如何使用模式匹配在scala中获取一个nonEmpty列表?

如何在Scala中按另一个列表拆分列表

Scala:如何进行字符串连接以避免 GC 开销问题

如何快速学习Scala