鉴于我将 DataBag 溢出到磁盘,为啥此 Pig UDF 会导致“错误:Java 堆空间”?

Posted

技术标签:

【中文标题】鉴于我将 DataBag 溢出到磁盘,为啥此 Pig UDF 会导致“错误:Java 堆空间”?【英文标题】:Why does this Pig UDF Result in an "Error: Java heap space" Given that I am Spilling the DataBag to Disk?鉴于我将 DataBag 溢出到磁盘,为什么此 Pig UDF 会导致“错误:Java 堆空间”? 【发布时间】:2014-02-05 02:03:30 【问题描述】:

这是我的 UDF:

public DataBag exec(Tuple input) throws IOException  
    Aggregate aggregatedOutput = null;
    
    int spillCount = 0;

    DataBag outputBag = BagFactory.newDefaultBag(); 
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) 
        Tuple tuple = iterator.next();
        //spillCount++;
        ...
        if (some condition regarding current input tuple)
            //do something to aggregatedOutput with information from input tuple
         else 
            //Because input tuple does not apply to current aggregateOutput
            //return current aggregateOutput and apply input tuple
            //to new aggregateOutput
            Tuple returnTuple = aggregatedOutput.getTuple();
            outputBag.add(returnTuple);
            spillCount++;
            aggregatedOutputTuple = new Aggregate(tuple);
            
            
            if (spillCount == 1000) 
                outputBag.spill();
                spillCount = 0;
            
        
    
    return outputBag; 

请注意,每输入 1000 个元组,包就会溢出到磁盘。我已将此数字设置为低至 50 和高至 100,000,但仍然收到内存错误:

Pig logfile dump:

Backend error message
---------------------
Error: Java heap space

Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space

我能做些什么来解决这个问题?它正在处理大约一百万行。

这是解决方案

使用累加器接口:

public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> 
    private DataBag outputBag = null;
    private UltraAggregation currentAggregation = null;
    
    public void accumulate(Tuple input) throws IOException 
        DataBag values = (DataBag)input.get(0);
        Aggregate aggregatedOutput = null;
        outputBag = BagFactory.getInstance().newDefaultBag();
        
        for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) 
            Tuple tuple = iterator.next();
            ...
            if (some condition regarding current input tuple)
                //do something to aggregatedOutput with information from input tuple
             else 
                //Because input tuple does not apply to current aggregateOutput
                //return current aggregateOutput and apply input tuple
                //to new aggregateOutput
                outputBag.add(aggregatedOutput.getTuple());
                aggregatedOutputTuple = new Aggregate(tuple);
            
        
    
    
    // Called when all tuples from current key have been passed to accumulate
    public DataBag getValue() 
        //Add final current aggregation
        outputBag.add(currentAggregation.getTuple());
        return outputBag;
    
    // This is called after getValue()
    // Not sure if these commands are necessary as they are repeated in beginning of accumulate
    public void cleanup() 
        outputBag = null;
        currentAggregation = null;
    
    
    public DataBag exec(Tuple input) throws IOException 
        // Same as above ^^ but this doesn't appear to ever be called.
    
    
    public Schema outputSchema(Schema input) 
        try 
            return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
         catch FrontendException e) 
            e.printStackTrace();
            return null;
        
    
    
    class Aggregate 
        ...
        public Tuple getTuple() 
            Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
            try 
                output.set(0, val);
                ...
             catch (ExecException e) 
                e.printStackTrace();
                return null;
            
        
        ...
    

【问题讨论】:

【参考方案1】:

您应该在每次追加到 outputBag 时增加 spillCount,而不是每次从迭代器中获取元组时。只有当spillCount 是1000 的倍数并且不满足您的if 条件时,您才会溢出,这可能不会经常发生(取决于逻辑)。这可以解释为什么不同的溢出阈值没有太大差异。

如果这不能解决您的问题,我会尝试扩展 AccumulatorEvalFunc&lt;DataBag&gt;。在您的情况下,您实际上不需要访问整个包。您的实现适合累加器样式实现,因为您只需要访问当前元组。这可能会减少内存使用量。本质上,您将拥有一个 DataBag 类型的实例变量来累积最终输出。您还将有一个 aggregatedOutput 的实例变量,该变量将具有当前聚合。对 accumulate() 的调用将 1) 更新当前聚合,或 2) 将当前聚合添加到 aggregatedOutput 并开始一个新聚合。这基本上遵循您的 for 循环的主体。

【讨论】:

开枪,我道歉;每次溢出后我将spillCount 重置为 0,但忘记将其包含在伪代码中。你能检查一下更新的帖子吗?我会检查你答案的另一部分。谢谢。 在这种情况下,重置为零不会有任何影响。在调用next() 时不要增加spillCount,而应该在附加到outputBag 时增加它。 spillCount 的目的是跟踪包的大小。相反,您使用它来跟踪您处理了多少元组,这不是一回事。 对于迟到的回复,我深表歉意。我现在在附加到outputBag 时增加spillCount 以跟踪包的大小。我仍然收到内存不足错误。接下来我将实现 AccumulatorEvalFunc 并告诉你它是如何进行的。 快速问题:Pig Book 指出“Pig 的包在超过一定大小阈值或仅剩余一定量的堆空间时自动处理将数据溢出到磁盘。溢出到磁盘是昂贵的,应该是尽可能避免。但如果你必须在一个包中存储大量数据,Pig 会管理它。如果 Pig 处理它,我为什么需要洒? 是的,我以前也看到过这个建议。但是在过去我发现即使它自动溢出,您有时仍然会遇到 OOM 错误。你有多少内存可用?你知道你为 mapred.child.java.opts 设置了什么吗?您可能需要增加最大堆大小,因为默认值对于您的任务来说可能太小了。您可以检查 jobconf 以查看您设置的内容。如果低于此值,您可以尝试将其增加到 -Xmx1024M 之类的值。

以上是关于鉴于我将 DataBag 溢出到磁盘,为啥此 Pig UDF 会导致“错误:Java 堆空间”?的主要内容,如果未能解决你的问题,请参考以下文章

无法将 128 分配给字节,为啥它会出错而不是溢出?

除非我添加溢出,否则 CSS 背景颜色不会显示:隐藏?为啥?

PostgreSQL:十进制/数字数据类型上的数字字段溢出 - 为啥会出现此错误

查询溢出到磁盘

为啥当我将查询字符串添加到我的 url 时,它会被 xampp 拒绝(“在此服务器上找不到请求的 URL”)

对于 Databag 中的每个元组都在一次又一次地从 try 块中执行