如何在 Scala 中处理大列表?
Posted
技术标签:
【中文标题】如何在 Scala 中处理大列表?【英文标题】:How to process a large list in Scala? 【发布时间】:2014-12-30 22:20:29 【问题描述】:我曾经用 Python 处理统计数据。比如一个大文件包含几千万个id:
$ cat report_ids | head
3788065
7950319
140494477
182851142
120757318
160033281
78087029
42591118
104363873
212143796
...
在 IPython 中,以下几行始终运行良好。
In [1]: lines = [line.strip() for line in open('./report_ids').readlines()]
In [2]: from collections import Counter
In [3]: d = Counter(lines)
In [4]: d[lines[0]]
Out[4]: 9
当我在 Scala 中尝试相同的操作时,会发生内存不足错误。
val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
lines: List[String] = List(3788065, 7950319, 140494477, 182851142, 120757318, 160033281, 78087029, 42591118, 104363873, 212143796, 175644298, 112703123, 213308679, 109649718, 11947300, 214660563, 83402867, 162877289, 83030111, 78231639, 45129180, 11635655, 34778452, 46604760, 142519099, 213261965, 137812002, 167057636, 119258917, 212722777, 177979907, 13754217, 156769524, 40682536, 202195379, 91879046, 22766751, 6656279, 11972540, 76929862, 91616020, 110579570, 143849021, 27239477, 65146692, 142968764, 153891284, 182405787, 153038108, 50714639, 113386401, 96657813, 75908413, 32215626, 175000692, 154337083, 113754207, 165109267, 3788065, 42285876, 171004203, 109802388, 92956305, 46690091, 103638776, 15141632, 110579570, 120984867, 183167775, 86841540, 60465849, 27239477, 91760184, 213464...
scala> val g = lines.groupBy(e => e).mapValues(x => x.length)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:326)
at scala.collection.immutable.HashMap.$plus(HashMap.scala:57)
at scala.collection.immutable.HashMap.$plus(HashMap.scala:36)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24)
at scala.collection.TraversableLike$$anonfun$groupBy$3.apply(TraversableLike.scala:334)
at scala.collection.TraversableLike$$anonfun$groupBy$3.apply(TraversableLike.scala:333)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:333)
at scala.collection.AbstractTraversable.groupBy(Traversable.scala:105)
at .<init>(<console>:8)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
然后我尝试了Scala中的lazy方法,还是不行。
scala> lazy val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
lines: List[String] = <lazy>
scala> val g = lines.groupBy(e => e).mapValues(x => x.length)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.<init>(String.java:203)
at java.io.BufferedReader.readLine(BufferedReader.java:349)
at java.io.BufferedReader.readLine(BufferedReader.java:382)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
at .lines$lzycompute(<console>:7)
at .lines(<console>:7)
at .<init>(<console>:8)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
那么我怎样才能像在 Python 中那样在 Scala 中完成小组工作呢?谢谢。
【问题讨论】:
【参考方案1】:我认为在 Python 和 Scala 中做一些完全不同的事情。让我们看看第一行:
lines = [line.strip() for line in open('./report_ids').readlines()]
在我看来,您在这里使用的是可迭代对象(以 Scala 术语),而不是真正的列表。我可能是错的——我使用 Python 工作的频率不足以记住——但让我们假设你是这样,看看你如何在 Scala 中获得同样的东西。你有这个:
val lines = scala.io.Source.fromFile("./report_ids").getLines.toList
现在,Scala 在标准库中没有可迭代的文件,尽管我认为 Scala I/O 自带了一个。在这里,您可以这样做以获取迭代器(不是同一件事):
val lines = scala.io.Source.fromFile("./report_ids").getLines
只是不要把它变成List
。 :) 现在,因为这是一个迭代器,而不是一个可迭代的,所以一旦你使用它两次它就会失败。所以让我们这样写吧:
def lines = scala.io.Source.fromFile("./report_ids").getLines
现在您可以多次使用“行”。遗憾的是,您会泄漏文件描述符——对于更严格的 I/O 处理,请查看更严格的 I/O 库,例如 Scalaz Stream 或 Scala I/O。或者使用贷款模式。
接下来,将Counter
代码替换为:
val g = lines.groupBy(e => e).mapValues(x => x.length)
这将是内存密集型的。这样的事情应该会好很多:
val g = scala.collection.mutable.HashMap.empty[String, Int] withDefaultValue 0
for (line <- lines) g(line) += 1
由于lines
是Iterator
,您将无法执行lines(0)
。对于第一行,您可以只使用lines.next
,也可以使用lines.toStream(0)
来访问索引,而无需将整个文件读入内存。
【讨论】:
以上是关于如何在 Scala 中处理大列表?的主要内容,如果未能解决你的问题,请参考以下文章