UDF 元组包导致错误“Long 不能转换为元组”

Posted

技术标签:

【中文标题】UDF 元组包导致错误“Long 不能转换为元组”【英文标题】:UDF bag of tuples causes error "Long cannot be cast to Tuple" 【发布时间】:2013-12-18 20:12:28 【问题描述】:

我有一个 Java UDF,它接受元组并返回一袋元组。当我对那个包进行操作时(参见下面的代码),我收到错误消息

2013-12-18 14:32:33,943 [主要] 错误 org.apache.pig.tools.pigstats.PigStats - 错误:java.lang.Long 不能 被强制转换为 org.apache.pig.data.Tuple

我无法仅通过读取数据、分组和展平来重新创建此错误,它仅发生在 UDF 返回的元组袋中,即使 DESCRIBE-ed 数据看起来与 group/ 的结果相同展平/等。

更新:这是重现错误的实际代码。 (感谢任何花时间阅读它的人。)

REGISTER test.jar;
A = LOAD 'test-input.txt' using PigStorage(',')
         AS (id:long, time:long, lat:double, lon:double, alt:double);
A_grouped = GROUP A BY (id);
U_out = FOREACH A_grouped
        GENERATE FLATTEN(
                test.Test(A)
        );
DESCRIBE U_out;
V = FOREACH U_out GENERATE output_tuple.id, output_tuple.time;
DESCRIBE V;
rmf test.out
STORE V INTO 'test.out' using PigStorage(',');

文件'test-input.txt':

0,1000,33,-100,5000
0,1010,33,-101,6000
0,1020,33,-102,7000
0,1030,33,-103,8000
1,1100,34,-100,15000
1,1110,34,-101,16000
1,1120,34,-102,17000
1,1130,34,-103,18000

输出:

$ pig -x local test.pig
    2013-12-18 16:47:50,467 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/jsnider/pig_1387403270431.log
    2013-12-18 16:47:50,751 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
    U_out: bag_of_tuples::output_tuple: (id: long,time: long,lat: double,lon: double,alt: double)
    V: id: long,time: long
    2013-12-18 16:47:51,532 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY
    2013-12-18 16:47:51,532 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - pig.usenewlogicalplan is set to true. New logical plan will be used.
    2013-12-18 16:47:51,907 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: V: Store(file:///home/jsnider/test.out:PigStorage(',')) - scope-32 Operator Key: scope-32)
    2013-12-18 16:47:51,929 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
    2013-12-18 16:47:51,988 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
    2013-12-18 16:47:51,988 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
    2013-12-18 16:47:51,996 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.AccumulatorOptimizer - Reducer is to run in accumulative mode.
    2013-12-18 16:47:52,139 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId=
    2013-12-18 16:47:52,158 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
    2013-12-18 16:47:52,199 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
    2013-12-18 16:47:54,225 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
    2013-12-18 16:47:54,249 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=164
    2013-12-18 16:47:54,249 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
    2013-12-18 16:47:54,299 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
    2013-12-18 16:47:54,299 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
    2013-12-18 16:47:54,308 [Thread-1] INFO  org.apache.hadoop.util.NativeCodeLoader - Loaded the native-hadoop library
    2013-12-18 16:47:54,601 [Thread-1] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
    2013-12-18 16:47:54,601 [Thread-1] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
    2013-12-18 16:47:54,627 [Thread-1] WARN  org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library is available
    2013-12-18 16:47:54,627 [Thread-1] INFO  org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library loaded
    2013-12-18 16:47:54,633 [Thread-1] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
    2013-12-18 16:47:54,801 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
    2013-12-18 16:47:54,965 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.system.dir;  Ignoring.
    2013-12-18 16:47:54,966 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.trash.interval;  Ignoring.
    2013-12-18 16:47:54,966 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.retain.hours;  Ignoring.
    2013-12-18 16:47:54,968 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.limit.kb;  Ignoring.
    2013-12-18 16:47:54,970 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.temp.dir;  Ignoring.
    2013-12-18 16:47:54,991 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks
    2013-12-18 16:47:54,994 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local_0001_m_000000_0
    2013-12-18 16:47:55,047 [pool-1-thread-1] INFO  org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0
    2013-12-18 16:47:55,053 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.Task -  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@ffeef1
    2013-12-18 16:47:55,058 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - Processing split: Number of splits :1
    Total Length = 164
    Input split[0]:
       Length = 164
      Locations:

    -----------------------

    2013-12-18 16:47:55,068 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
    2013-12-18 16:47:55,118 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
    2013-12-18 16:47:55,118 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
    2013-12-18 16:47:55,152 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - Starting flush of map output
    2013-12-18 16:47:55,164 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - Finished spill 0
    2013-12-18 16:47:55,167 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    2013-12-18 16:47:55,170 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.LocalJobRunner -
    2013-12-18 16:47:55,171 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000000_0' done.
    2013-12-18 16:47:55,171 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.LocalJobRunner - Finishing task: attempt_local_0001_m_000000_0
    2013-12-18 16:47:55,172 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete.
    2013-12-18 16:47:55,192 [Thread-2] INFO  org.apache.hadoop.mapred.Task -  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@38650646
    2013-12-18 16:47:55,192 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner -
    2013-12-18 16:47:55,196 [Thread-2] INFO  org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
    2013-12-18 16:47:55,201 [Thread-2] INFO  org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 418 bytes
    2013-12-18 16:47:55,201 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner -
    2013-12-18 16:47:55,257 [Thread-2] WARN  org.apache.hadoop.mapred.LocalJobRunner - job_local_0001
    java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.pig.data.Tuple
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:408)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:138)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:312)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:360)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.runPipeline(PigMapReduce.java:434)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:402)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:382)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:251)
            at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
            at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
            at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
            at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:392)
    2013-12-18 16:47:55,477 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0001
    2013-12-18 16:47:59,995 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - job job_local_0001 has failed! Stop running all dependent jobs
    2013-12-18 16:48:00,008 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
    2013-12-18 16:48:00,010 [main] ERROR org.apache.pig.tools.pigstats.PigStatsUtil - 1 map reduce job(s) failed!
    2013-12-18 16:48:00,011 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Detected Local mode. Stats reported below may be incomplete
    2013-12-18 16:48:00,015 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Script Statistics:

    HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features
    0.20.2-cdh3u6   0.8.1-cdh3u6    jsnider 2013-12-18 16:47:52     2013-12-18 16:48:00     GROUP_BY

    Failed!

    Failed Jobs:
    JobId   Alias   Feature Message Outputs
    job_local_0001  A,A_grouped,U_out,V     GROUP_BY        Message: Job failed! Error - NA file:///home/jsnider/test.out,

    Input(s):
    Failed to read data from "file:///home/jsnider/test-input.txt"

    Output(s):
    Failed to produce result in "file:///home/jsnider/test.out"

    Job DAG:
    job_local_0001


    2013-12-18 16:48:00,015 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Failed!
    2013-12-18 16:48:00,040 [main] ERROR org.apache.pig.tools.grunt.GruntParser - ERROR 2244: Job failed, hadoop does not return any error message
    Details at logfile: /home/jsnider/pig_1387403270431.log

还有三个java文件:

Test.java

package test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;

import org.apache.pig.Accumulator;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class Test extends EvalFunc<DataBag> implements Accumulator<DataBag>

    public static ArrayList<Point> points = null;

    public DataBag exec(Tuple input) throws IOException 
        if (input == null || input.size() == 0)
            return null;
        accumulate(input);
        DataBag output = getValue();
        cleanup();
        return output;
    

    public void accumulate(DataBag b) throws IOException 
        try 
            if (b == null)
                return;
            Iterator<Tuple> fit = b.iterator();
            while (fit.hasNext()) 
                Tuple f = fit.next();
                storePt(f);
            
         catch (Exception e) 
            int errCode = 2106;
            String msg = "Error while computing in " + this.getClass().getSimpleName();
            throw new ExecException(msg, errCode, PigException.BUG, e);           
        
    

    public void accumulate(Tuple b) throws IOException 
        try 
            if (b == null || b.size() == 0)
                return;
            for (Object f : b.getAll()) 
                if (f instanceof Tuple) 
                    storePt((Tuple)f);
                 else if (f instanceof DataBag) 
                    accumulate((DataBag)f);
                 else 
                    throw new IOException("tuple input is not a tuple or a databag... x__x");
                
            
         catch (Exception e) 
            int errCode = 2106;
            String msg = "Error while computing in " + this.getClass().getSimpleName();
            throw new ExecException(msg, errCode, PigException.BUG, e);           
        
    

    @Override
    public DataBag getValue() 

        if (points == null)
            points = new ArrayList<Point>();
        Collections.sort(points);

        DataBag myBag = BagFactory.getInstance().newDefaultBag();

        for (Point pt : points) 
            Measure sm = new Measure(pt);
            myBag.add(sm.asTuple());
        
        return myBag;
    

    public void cleanup() 
        points = null;
    

    public Schema outputSchema(Schema input) 
        try 
            Schema.FieldSchema tupleFs 
                = new Schema.FieldSchema("output_tuple", Measure.smSchema(), DataType.TUPLE);
            Schema bagSchema = new Schema(tupleFs);
            Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_tuples", bagSchema, DataType.BAG);
            return new Schema(bagFs);
         catch (Exception e)
                return null;
        
    

    public static void storePt(Tuple f) 
        Object[] field = f.getAll().toArray();

        Point pt = new Point(
                field[0] == null ? 0 : (Long)field[0],
                field[1] == null ? 0 : (Long)field[1],
                field[2] == null ? 0 : (Double)field[2],
                field[3] == null ? 0 : (Double)field[3],
                field[4] == null ? Double.MIN_VALUE : (Double)field[4]
            );

        if (points == null)
            points = new ArrayList<Point>();

        points.add(pt);
    

Point.java:

package test;

public class Point implements Comparable<Point> 
    long id;
    long time;
    double lat;
    double lon;
    double alt;

    public Point(Point c) 
        this.id = c.id;
        this.time = c.time;
        this.lat = c.lat;
        this.lon = c.lon;
        this.alt = c.alt;
    

    public Point(long l, long m, double d, double e, double f) 
        id = l;
        time = m;
        lat = d;
        lon = e;
        alt = f;
    

    @Override
    public int compareTo(Point other) 
        final int BEFORE = -1;
        final int EQUAL = 0;
        final int AFTER = 1;

        if (this == other) return EQUAL;
        if (this.id < other.id) return BEFORE;
        if (this.id > other.id) return AFTER;
        if (this.time < other.time) return BEFORE;
        if (this.time > other.time) return AFTER;
        if (this.lat > other.lat) return BEFORE;
        if (this.lat < other.lat) return AFTER;
        if (this.lon > other.lon) return BEFORE;
        if (this.lon < other.lon) return AFTER;
        if (this.alt > other.alt) return BEFORE;
        if (this.alt < other.alt) return AFTER;
        return EQUAL;
    

    public String toString() 
        return id + " " + time; 
    

Measure.java:

package test;

import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class Measure 
    private long id;
    private long time;
    private double lat;
    private double lon;
    private double alt;

    public Measure(Point pt) 
        id = pt.id;
        time = pt.time;
        lat = pt.lat;
        lon = pt.lon;
        alt = pt.alt;
    

    public Tuple asTuple() 
        Tuple myTuple = TupleFactory.getInstance().newTuple();
        myTuple.append(id);
        myTuple.append(time);
        myTuple.append(lat);
        myTuple.append(lon);
        myTuple.append(alt);
        return myTuple;
    

    public static Schema smSchema() 
        Schema tupleSchema = new Schema();
        tupleSchema.add(new Schema.FieldSchema("id", DataType.LONG));
        tupleSchema.add(new Schema.FieldSchema("time", DataType.LONG));
        tupleSchema.add(new Schema.FieldSchema("lat", DataType.DOUBLE));
        tupleSchema.add(new Schema.FieldSchema("lon", DataType.DOUBLE));
        tupleSchema.add(new Schema.FieldSchema("alt", DataType.DOUBLE));
        return tupleSchema;
    

【问题讨论】:

你能发布你的UDF的代码吗? 我无法轻松发布我的代码。由于工作原因,我无法分享实际代码,而且它太大而无法清理以进行分享。您希望在哪个部分找到错误? Schema,或者输出包是如何构建的,或者......?如果我找不到解决方案,我将编写一个小型 UDF 来尝试在干净的上下文中重现错误。 发布 outputSchema 方法(如果已定义)、exec 方法的签名以及如何构建输出包。 添加了完整的代码,您现在可以看到(以及其他所有内容)。 如果您不使用FLATTEN 而仅使用DUMP U_out,您的代码会运行吗? 【参考方案1】:

解决方法是将UDF的返回值投到相应的包中:

U_out = FOREACH A_grouped
    GENERATE FLATTEN(
        (bagtuple(long,long,double,double,double))(test.Test(A))
    ) AS (id:long, time:long, lat:double, lon:double, alt:double);

即使 UDF 返回的架构是正确的,输出仍然需要转换,才能正常工作。

【讨论】:

以上是关于UDF 元组包导致错误“Long 不能转换为元组”的主要内容,如果未能解决你的问题,请参考以下文章

AI基础_Day05

PySpark UDF 返回可变大小的元组

使用 AliasableEvalFunc 并在 Java UDF 中读取一组元组

鉴于我将 DataBag 溢出到磁盘,为啥此 Pig UDF 会导致“错误:Java 堆空间”?

如何在 PySpark 的 UDF 中返回“元组类型”?

多个元组作为用 Java 编写的 pig UDF 中的输入