未设置 Hadoop Pig 输出目录
Posted
技术标签:
【中文标题】未设置 Hadoop Pig 输出目录【英文标题】:Hadoop Pig Output Directory not set 【发布时间】:2013-02-06 01:23:02 【问题描述】:我正在编写自己的 Pig Store 类,我不想将其存储在文件中,我打算将其发送到某个第 3 方数据存储(缺少 API 调用)。
注意:我在 Cloudera 的 VirtualBox 映像上运行它。
我已经编写了我的 java 类(如下所列)并创建了我在下面的 id.pig 脚本中使用的 mystore.jar:
store B INTO 'mylocation' USING MyStore('mynewlocation')
在使用 pig 运行此脚本时,我看到以下错误: 错误 6000: 输出位置验证失败:'file://home/cloudera/test/id.out 更多信息如下: 未设置输出目录。
or.apache.pig.impl.plan.VisitorException: ERROR 6000:
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95)
请帮忙!
-------- MyStore.java ----------
public class MyStore extends StoreFunc
protected RecordWriter writer = null;
private String location = null;
public MyStore ()
location= null;
public MyStore (String location)
this.location= location;
@Override
public OutputFormat getOutputFormat() throws IOException
return new MyStoreOutputFormat(location);
@Override
public void prepareToWrite(RecordWriter writer) throws IOException
this.writer = writer;
@Override
public void putNext(Tuple tuple) throws IOException
//write tuple to location
try
writer.write(null, tuple.toString());
catch (InterruptedException e)
e.printStackTrace();
@Override
public void setStoreLocation(String location, Job job) throws IOException
if(location!= null)
this.location= location;
-------- MyStoreOutputFormat.java ----------
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.data.Tuple;
public class MyStoreOutputFormat extends
TextOutputFormat<WritableComparable, Tuple>
private String location = null;
public MyStoreOutputFormat(String location)
this.location = location;
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException
Configuration conf = job.getConfiguration();
String extension = location;
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new MyStoreRecordWriter(fileOut);
protected static class MyStoreRecordWriter extends
RecordWriter<WritableComparable, Tuple>
DataOutputStream out = null;
public MyStoreRecordWriter(DataOutputStream out)
this.out = out;
@Override
public void close(TaskAttemptContext taskContext) throws IOException,
InterruptedException
// close the location
@Override
public void write(WritableComparable key, Tuple value)
throws IOException, InterruptedException
// write the data to location
if (out != null)
out.writeChars(value.toString()); // will be calling API later. let me first dump to the location!
我这里有什么遗漏吗?
【问题讨论】:
请帮忙。我迫切需要它。谢谢! 【参考方案1】:首先,我认为您应该使用 Job 配置来存储位置值,而不是实例变量
在计划作业时会调用 setStoreLocation 方法中对局部变量“location”的赋值,但可能直到执行阶段才会调用 getOutputFormat,此时位置变量可能不再设置(新实例可能已经创建了您的班级)。
如果您查看PigStorage.setStoreLocation
的来源,您应该注意到它们将位置存储在作业配置中(第 2 行):
@Override
public void setStoreLocation(String location, Job job) throws IOException
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) )
FileOutputFormat.setCompressOutput( job, true );
String codec = job.getConfiguration().get( "output.compression.codec" );
try
FileOutputFormat.setOutputCompressorClass( job, (Class<? extends CompressionCodec>) Class.forName( codec ) );
catch (ClassNotFoundException e)
throw new RuntimeException("Class not found: " + codec );
else
// This makes it so that storing to a directory ending with ".gz" or ".bz2" works.
setCompression(new Path(location), job);
所以我认为您应该将位置存储在作业变量中:
@Override
public void setStoreLocation(String location, Job job) throws IOException
if(location!= null)
job.getConfiguration().set("mylocation", location);
然后您的自定义输出格式可以在 createRecordReader 方法中提取:
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException
Configuration conf = job.getConfiguration();
String extension = conf.get("mylocation");
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new MyStoreRecordWriter(fileOut);
最后(可能是您看到的错误的实际原因),您的输出格式扩展了 TextOutputFormat,并且您在记录写入器中使用了 getDefaultWorkFile
方法 - 此方法需要知道您将文件输出到哪里在 HDFS 中,并且您没有在 setStoreLocation 方法中调用 FileOutputFormat.setOutputPath(job, new Path(location));
(请参阅我之前粘贴的 PigStorage.setStoreLocation 方法)。所以错误是因为它不知道在哪里创建默认工作文件。
【讨论】:
感谢克里斯。我错过了“FileOutputFormat.setOutputPath(job, new Path(location));”称呼。根据您的输入更改了我的代码。以上是关于未设置 Hadoop Pig 输出目录的主要内容,如果未能解决你的问题,请参考以下文章
运行 pig 0.7.0 错误:错误 2998:未处理的内部错误
Hadoop Pig 从加载 glob 获取子目录名称并在存储中使用它
Pig 0.13 错误 2998:未处理的内部错误。 org/apache/hadoop/mapreduce/task/JobContextImpl