Spark Java 错误:大小超过 Integer.MAX_VALUE

Posted

技术标签:

【中文标题】Spark Java 错误:大小超过 Integer.MAX_VALUE【英文标题】:Spark Java Error: Size exceeds Integer.MAX_VALUE 【发布时间】:2015-05-12 02:11:14 【问题描述】:

我正在尝试将 spark 用于一些简单的机器学习任务。 我使用 pyspark 和 spark 1.2.0 做了一个简单的逻辑回归问题。 我有 120 万条记录用于训练,我对记录的特征进行了哈希处理。 当我将哈希特征数设置为 1024 时,程序运行良好,但当我将哈希特征数设置为 16384 时,程序多次失败并出现以下错误:

Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:745)

    at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

当我在将数据传输到 LabeledPoint 后训练 LogisticRegressionWithSGD 时发生此错误。

有人对此有想法吗?

我的代码如下(我为此使用了 IPython Notebook):

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from numpy import array
from sklearn.feature_extraction import FeatureHasher
from pyspark import SparkContext
sf = SparkConf().setAppName("test").set("spark.executor.memory", "50g").set("spark.cores.max", 30)
sc = SparkContext(conf=sf)
training_file = sc.textFile("train_small.txt")
def hash_feature(line):
    values = [0, dict()]
    for index, x in enumerate(line.strip("\n").split('\t')):
        if index == 0:
            values[0] = float(x)
        else:
            values[1][str(index)+"_"+x] = 1
    return values
n_feature = 2**14
hasher = FeatureHasher(n_features=n_feature)
training_file_hashed = training_file.map(lambda line: [hash_feature(line)[0], hasher.transform([hash_feature(line)[1]])])
def build_lable_points(line):
    values = [0.0] * n_feature
    for index, value in zip(line[1].indices, line[1].data):
        values[index] = value
    return LabeledPoint(line[0], values)
parsed_training_data = training_file_hashed.map(lambda line: build_lable_points(line))
model = LogisticRegressionWithSGD.train(parsed_training_data)

执行最后一行时发生错误。

【问题讨论】:

你能展示你的代码吗? 代码已添加到原帖中,谢谢 你可以尝试更多的分区吗? (我认为更多的分区意味着更少的分区数据,所以它应该可以解决问题)。 【参考方案1】:

Integer.MAX_INT 限制是对所存储文件的大小。 120 万行不是什么大事,我不确定你的问题是“火花的极限”。更有可能的是,您的某些工作正在创建太大而无法由任何给定执行者处理的东西。

我不是 Python 编码员,但是当您“对记录的特征进行哈希处理”时,您可能会获取一组非常稀疏的记录作为样本并创建一个非稀疏数组。这将意味着 16384 个功能需要大量内存。特别是,当您执行zip(line[1].indices, line[1].data) 时。唯一没有让你记忆犹新的原因是你似乎已经配置了它的垃圾负载(50G)。

另一件可能有帮助的事情是增加分区。所以如果你不能让你的行使用更少的内存,至少你可以尝试在任何给定的任务上使用更少的行。创建的任何临时文件都可能依赖于此,因此您不太可能达到文件限制。


而且,与错误完全无关,但与您要执行的操作相关:

16384 确实是一大堆特征,在乐观的情况下,每一个都只是一个布尔特征,你总共有 2^16384 种可能的排列可供学习,这是一个巨大的数字(在这里试试:@ 987654321@)。

很有可能没有算法能够仅使用 120 万个样本来学习决策边界,您可能需要至少数万亿个样本才能在这样的特征空间上有所作为。机器学习有其局限性,因此如果您没有获得比随机更好的准确度,请不要感到惊讶。

我肯定会建议先尝试某种降维方法!

【讨论】:

谢谢。通过在加载数据时使用更多分区来解决此问题。我们只是在小数据集上进行测试并获得一些想法,然后我们将应用到具有更强大机器的大数据集。【参考方案2】:

在某些时候,它会尝试存储功能,并且 1.2M * 16384 大于 Integer.MAX_INT,因此您尝试存储的功能超过了 Spark 支持的最大大小。

您可能遇到了 Apache Spark 的限制。

【讨论】:

谢谢。你能详细说明一下吗?我从未听说过 spark 支持的最大功能大小。我知道 spark 的块大小有限制,请参阅issues.apache.org/jira/browse/SPARK-1476,我不确定我是否正在点击这个,但如果我点击这个,我想知道如何在不减少特征数量和记录数量的情况下避免这种情况【参考方案3】:

增加分区数可能会导致Active tasks is a negative number in Spark UI,这可能意味着分区数过多。

【讨论】:

以上是关于Spark Java 错误:大小超过 Integer.MAX_VALUE的主要内容,如果未能解决你的问题,请参考以下文章

Spark:连接 2 个大型 DF 时,大小超过 Integer.MAX_VALUE

火花错误 - 小数精度 39 超过最大精度 38

spark.driver.maxResultSize 是啥?

Apache Spark Codegen Stage 超过 64 KB

Spark Cassandra 使用区分大小写的名称写入 UDT 失败

Spark - 写入 128 MB 大小的 parquet 文件