UDF 元组包导致错误“Long 不能转换为元组”
Posted
技术标签:
【中文标题】UDF 元组包导致错误“Long 不能转换为元组”【英文标题】:UDF bag of tuples causes error "Long cannot be cast to Tuple" 【发布时间】:2013-12-18 20:12:28 【问题描述】:我有一个 Java UDF,它接受元组并返回一袋元组。当我对那个包进行操作时(参见下面的代码),我收到错误消息
2013-12-18 14:32:33,943 [主要] 错误 org.apache.pig.tools.pigstats.PigStats - 错误:java.lang.Long 不能 被强制转换为 org.apache.pig.data.Tuple
我无法仅通过读取数据、分组和展平来重新创建此错误,它仅发生在 UDF 返回的元组袋中,即使 DESCRIBE
-ed 数据看起来与 group/ 的结果相同展平/等。
更新:这是重现错误的实际代码。 (感谢任何花时间阅读它的人。)
REGISTER test.jar;
A = LOAD 'test-input.txt' using PigStorage(',')
AS (id:long, time:long, lat:double, lon:double, alt:double);
A_grouped = GROUP A BY (id);
U_out = FOREACH A_grouped
GENERATE FLATTEN(
test.Test(A)
);
DESCRIBE U_out;
V = FOREACH U_out GENERATE output_tuple.id, output_tuple.time;
DESCRIBE V;
rmf test.out
STORE V INTO 'test.out' using PigStorage(',');
文件'test-input.txt':
0,1000,33,-100,5000
0,1010,33,-101,6000
0,1020,33,-102,7000
0,1030,33,-103,8000
1,1100,34,-100,15000
1,1110,34,-101,16000
1,1120,34,-102,17000
1,1130,34,-103,18000
输出:
$ pig -x local test.pig
2013-12-18 16:47:50,467 [main] INFO org.apache.pig.Main - Logging error messages to: /home/jsnider/pig_1387403270431.log
2013-12-18 16:47:50,751 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
U_out: bag_of_tuples::output_tuple: (id: long,time: long,lat: double,lon: double,alt: double)
V: id: long,time: long
2013-12-18 16:47:51,532 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY
2013-12-18 16:47:51,532 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - pig.usenewlogicalplan is set to true. New logical plan will be used.
2013-12-18 16:47:51,907 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: V: Store(file:///home/jsnider/test.out:PigStorage(',')) - scope-32 Operator Key: scope-32)
2013-12-18 16:47:51,929 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2013-12-18 16:47:51,988 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2013-12-18 16:47:51,988 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2013-12-18 16:47:51,996 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.AccumulatorOptimizer - Reducer is to run in accumulative mode.
2013-12-18 16:47:52,139 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId=
2013-12-18 16:47:52,158 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
2013-12-18 16:47:52,199 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2013-12-18 16:47:54,225 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2013-12-18 16:47:54,249 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=164
2013-12-18 16:47:54,249 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
2013-12-18 16:47:54,299 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2013-12-18 16:47:54,299 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2013-12-18 16:47:54,308 [Thread-1] INFO org.apache.hadoop.util.NativeCodeLoader - Loaded the native-hadoop library
2013-12-18 16:47:54,601 [Thread-1] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2013-12-18 16:47:54,601 [Thread-1] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2013-12-18 16:47:54,627 [Thread-1] WARN org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library is available
2013-12-18 16:47:54,627 [Thread-1] INFO org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library loaded
2013-12-18 16:47:54,633 [Thread-1] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2013-12-18 16:47:54,801 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2013-12-18 16:47:54,965 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.system.dir; Ignoring.
2013-12-18 16:47:54,966 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.trash.interval; Ignoring.
2013-12-18 16:47:54,966 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.retain.hours; Ignoring.
2013-12-18 16:47:54,968 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.limit.kb; Ignoring.
2013-12-18 16:47:54,970 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.temp.dir; Ignoring.
2013-12-18 16:47:54,991 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks
2013-12-18 16:47:54,994 [pool-1-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local_0001_m_000000_0
2013-12-18 16:47:55,047 [pool-1-thread-1] INFO org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0
2013-12-18 16:47:55,053 [pool-1-thread-1] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@ffeef1
2013-12-18 16:47:55,058 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - Processing split: Number of splits :1
Total Length = 164
Input split[0]:
Length = 164
Locations:
-----------------------
2013-12-18 16:47:55,068 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
2013-12-18 16:47:55,118 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
2013-12-18 16:47:55,118 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
2013-12-18 16:47:55,152 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
2013-12-18 16:47:55,164 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2013-12-18 16:47:55,167 [pool-1-thread-1] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-12-18 16:47:55,170 [pool-1-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner -
2013-12-18 16:47:55,171 [pool-1-thread-1] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000000_0' done.
2013-12-18 16:47:55,171 [pool-1-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Finishing task: attempt_local_0001_m_000000_0
2013-12-18 16:47:55,172 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete.
2013-12-18 16:47:55,192 [Thread-2] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@38650646
2013-12-18 16:47:55,192 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner -
2013-12-18 16:47:55,196 [Thread-2] INFO org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
2013-12-18 16:47:55,201 [Thread-2] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 418 bytes
2013-12-18 16:47:55,201 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner -
2013-12-18 16:47:55,257 [Thread-2] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local_0001
java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.pig.data.Tuple
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:408)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:138)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:312)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:360)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.runPipeline(PigMapReduce.java:434)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:402)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:382)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:251)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:392)
2013-12-18 16:47:55,477 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0001
2013-12-18 16:47:59,995 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - job job_local_0001 has failed! Stop running all dependent jobs
2013-12-18 16:48:00,008 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2013-12-18 16:48:00,010 [main] ERROR org.apache.pig.tools.pigstats.PigStatsUtil - 1 map reduce job(s) failed!
2013-12-18 16:48:00,011 [main] INFO org.apache.pig.tools.pigstats.PigStats - Detected Local mode. Stats reported below may be incomplete
2013-12-18 16:48:00,015 [main] INFO org.apache.pig.tools.pigstats.PigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
0.20.2-cdh3u6 0.8.1-cdh3u6 jsnider 2013-12-18 16:47:52 2013-12-18 16:48:00 GROUP_BY
Failed!
Failed Jobs:
JobId Alias Feature Message Outputs
job_local_0001 A,A_grouped,U_out,V GROUP_BY Message: Job failed! Error - NA file:///home/jsnider/test.out,
Input(s):
Failed to read data from "file:///home/jsnider/test-input.txt"
Output(s):
Failed to produce result in "file:///home/jsnider/test.out"
Job DAG:
job_local_0001
2013-12-18 16:48:00,015 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Failed!
2013-12-18 16:48:00,040 [main] ERROR org.apache.pig.tools.grunt.GruntParser - ERROR 2244: Job failed, hadoop does not return any error message
Details at logfile: /home/jsnider/pig_1387403270431.log
还有三个java文件:
Test.java
package test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import org.apache.pig.Accumulator;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class Test extends EvalFunc<DataBag> implements Accumulator<DataBag>
public static ArrayList<Point> points = null;
public DataBag exec(Tuple input) throws IOException
if (input == null || input.size() == 0)
return null;
accumulate(input);
DataBag output = getValue();
cleanup();
return output;
public void accumulate(DataBag b) throws IOException
try
if (b == null)
return;
Iterator<Tuple> fit = b.iterator();
while (fit.hasNext())
Tuple f = fit.next();
storePt(f);
catch (Exception e)
int errCode = 2106;
String msg = "Error while computing in " + this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
public void accumulate(Tuple b) throws IOException
try
if (b == null || b.size() == 0)
return;
for (Object f : b.getAll())
if (f instanceof Tuple)
storePt((Tuple)f);
else if (f instanceof DataBag)
accumulate((DataBag)f);
else
throw new IOException("tuple input is not a tuple or a databag... x__x");
catch (Exception e)
int errCode = 2106;
String msg = "Error while computing in " + this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
@Override
public DataBag getValue()
if (points == null)
points = new ArrayList<Point>();
Collections.sort(points);
DataBag myBag = BagFactory.getInstance().newDefaultBag();
for (Point pt : points)
Measure sm = new Measure(pt);
myBag.add(sm.asTuple());
return myBag;
public void cleanup()
points = null;
public Schema outputSchema(Schema input)
try
Schema.FieldSchema tupleFs
= new Schema.FieldSchema("output_tuple", Measure.smSchema(), DataType.TUPLE);
Schema bagSchema = new Schema(tupleFs);
Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_tuples", bagSchema, DataType.BAG);
return new Schema(bagFs);
catch (Exception e)
return null;
public static void storePt(Tuple f)
Object[] field = f.getAll().toArray();
Point pt = new Point(
field[0] == null ? 0 : (Long)field[0],
field[1] == null ? 0 : (Long)field[1],
field[2] == null ? 0 : (Double)field[2],
field[3] == null ? 0 : (Double)field[3],
field[4] == null ? Double.MIN_VALUE : (Double)field[4]
);
if (points == null)
points = new ArrayList<Point>();
points.add(pt);
Point.java:
package test;
public class Point implements Comparable<Point>
long id;
long time;
double lat;
double lon;
double alt;
public Point(Point c)
this.id = c.id;
this.time = c.time;
this.lat = c.lat;
this.lon = c.lon;
this.alt = c.alt;
public Point(long l, long m, double d, double e, double f)
id = l;
time = m;
lat = d;
lon = e;
alt = f;
@Override
public int compareTo(Point other)
final int BEFORE = -1;
final int EQUAL = 0;
final int AFTER = 1;
if (this == other) return EQUAL;
if (this.id < other.id) return BEFORE;
if (this.id > other.id) return AFTER;
if (this.time < other.time) return BEFORE;
if (this.time > other.time) return AFTER;
if (this.lat > other.lat) return BEFORE;
if (this.lat < other.lat) return AFTER;
if (this.lon > other.lon) return BEFORE;
if (this.lon < other.lon) return AFTER;
if (this.alt > other.alt) return BEFORE;
if (this.alt < other.alt) return AFTER;
return EQUAL;
public String toString()
return id + " " + time;
Measure.java:
package test;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class Measure
private long id;
private long time;
private double lat;
private double lon;
private double alt;
public Measure(Point pt)
id = pt.id;
time = pt.time;
lat = pt.lat;
lon = pt.lon;
alt = pt.alt;
public Tuple asTuple()
Tuple myTuple = TupleFactory.getInstance().newTuple();
myTuple.append(id);
myTuple.append(time);
myTuple.append(lat);
myTuple.append(lon);
myTuple.append(alt);
return myTuple;
public static Schema smSchema()
Schema tupleSchema = new Schema();
tupleSchema.add(new Schema.FieldSchema("id", DataType.LONG));
tupleSchema.add(new Schema.FieldSchema("time", DataType.LONG));
tupleSchema.add(new Schema.FieldSchema("lat", DataType.DOUBLE));
tupleSchema.add(new Schema.FieldSchema("lon", DataType.DOUBLE));
tupleSchema.add(new Schema.FieldSchema("alt", DataType.DOUBLE));
return tupleSchema;
【问题讨论】:
你能发布你的UDF的代码吗? 我无法轻松发布我的代码。由于工作原因,我无法分享实际代码,而且它太大而无法清理以进行分享。您希望在哪个部分找到错误? Schema,或者输出包是如何构建的,或者......?如果我找不到解决方案,我将编写一个小型 UDF 来尝试在干净的上下文中重现错误。 发布 outputSchema 方法(如果已定义)、exec 方法的签名以及如何构建输出包。 添加了完整的代码,您现在可以看到(以及其他所有内容)。 如果您不使用FLATTEN
而仅使用DUMP U_out
,您的代码会运行吗?
【参考方案1】:
解决方法是将UDF的返回值投到相应的包中:
U_out = FOREACH A_grouped
GENERATE FLATTEN(
(bagtuple(long,long,double,double,double))(test.Test(A))
) AS (id:long, time:long, lat:double, lon:double, alt:double);
即使 UDF 返回的架构是正确的,输出仍然需要转换,才能正常工作。
【讨论】:
以上是关于UDF 元组包导致错误“Long 不能转换为元组”的主要内容,如果未能解决你的问题,请参考以下文章
使用 AliasableEvalFunc 并在 Java UDF 中读取一组元组