未设置 Hadoop Pig 输出目录

Posted

技术标签:

【中文标题】未设置 Hadoop Pig 输出目录【英文标题】:Hadoop Pig Output Directory not set 【发布时间】:2013-02-06 01:23:02 【问题描述】:

我正在编写自己的 Pig Store 类,我不想将其存储在文件中,我打算将其发送到某个第 3 方数据存储(缺少 API 调用)。

注意:我在 Cloudera 的 VirtualBox 映像上运行它。

我已经编写了我的 java 类(如下所列)并创建了我在下面的 id.pig 脚本中使用的 mystore.jar:

store B INTO 'mylocation' USING MyStore('mynewlocation')

在使用 pig 运行此脚本时,我看到以下错误: 错误 6000: 输出位置验证失败:'file://home/cloudera/test/id.out 更多信息如下: 未设置输出目录。

or.apache.pig.impl.plan.VisitorException: ERROR 6000:
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95)

请帮忙!

-------- MyStore.java ----------

public class MyStore extends StoreFunc 
    protected RecordWriter writer = null;
    private String location = null;


    public MyStore () 
        location= null;
    

    public MyStore (String location) 
        this.location= location;
    

    @Override
    public OutputFormat getOutputFormat() throws IOException 
        return new MyStoreOutputFormat(location);
    

    @Override
    public void prepareToWrite(RecordWriter writer) throws IOException 
        this.writer = writer;
    

    @Override
    public void putNext(Tuple tuple) throws IOException 
        //write tuple to location

        try 
            writer.write(null, tuple.toString());
         catch (InterruptedException e)           
            e.printStackTrace();
        
    

    @Override
    public void setStoreLocation(String location, Job job) throws IOException 
        if(location!= null)
            this.location= location;
    


-------- MyStoreOutputFormat.java ----------

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.data.Tuple;

public class MyStoreOutputFormat extends
        TextOutputFormat<WritableComparable, Tuple> 
    private String location = null;

    public MyStoreOutputFormat(String location) 

        this.location = location;
    

    @Override
    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
            TaskAttemptContext job) throws IOException, InterruptedException 

        Configuration conf = job.getConfiguration();

        String extension = location;
        Path file = getDefaultWorkFile(job, extension);     
        FileSystem fs = file.getFileSystem(conf);

        FSDataOutputStream fileOut = fs.create(file, false);

        return new MyStoreRecordWriter(fileOut);
    

    protected static class MyStoreRecordWriter extends
            RecordWriter<WritableComparable, Tuple> 

        DataOutputStream out = null;

        public MyStoreRecordWriter(DataOutputStream out) 
            this.out = out;
        

        @Override
        public void close(TaskAttemptContext taskContext) throws IOException,
                InterruptedException 
            // close the location
        

        @Override
        public void write(WritableComparable key, Tuple value)
                throws IOException, InterruptedException 

            // write the data to location
            if (out != null) 
                out.writeChars(value.toString()); // will be calling API later. let me first dump to the location!
            
        

    

我这里有什么遗漏吗?

【问题讨论】:

请帮忙。我迫切需要它。谢谢! 【参考方案1】:

首先,我认为您应该使用 Job 配置来存储位置值,而不是实例变量

在计划作业时会调用 setStoreLocation 方法中对局部变量“location”的赋值,但可能直到执行阶段才会调用 getOutputFormat,此时位置变量可能不再设置(新实例可能已经创建了您的班级)。

如果您查看PigStorage.setStoreLocation 的来源,您应该注意到它们将位置存储在作业配置中(第 2 行):

@Override
public void setStoreLocation(String location, Job job) throws IOException 
    job.getConfiguration().set("mapred.textoutputformat.separator", "");
    FileOutputFormat.setOutputPath(job, new Path(location));

    if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) 
        FileOutputFormat.setCompressOutput( job, true );
        String codec = job.getConfiguration().get( "output.compression.codec" );
        try 
            FileOutputFormat.setOutputCompressorClass( job,  (Class<? extends CompressionCodec>) Class.forName( codec ) );
         catch (ClassNotFoundException e) 
            throw new RuntimeException("Class not found: " + codec );
        
     else 
        // This makes it so that storing to a directory ending with ".gz" or ".bz2" works.
        setCompression(new Path(location), job);
    

所以我认为您应该将位置存储在作业变量中:

@Override
public void setStoreLocation(String location, Job job) throws IOException 
    if(location!= null)
        job.getConfiguration().set("mylocation", location);

然后您的自定义输出格式可以在 createRecordReader 方法中提取:

@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
        TaskAttemptContext job) throws IOException, InterruptedException 

    Configuration conf = job.getConfiguration();

    String extension = conf.get("mylocation");
    Path file = getDefaultWorkFile(job, extension);     
    FileSystem fs = file.getFileSystem(conf);

    FSDataOutputStream fileOut = fs.create(file, false);

    return new MyStoreRecordWriter(fileOut);

最后(可能是您看到的错误的实际原因),您的输出格式扩展了 TextOutputFormat,并且您在记录写入器中使用了 getDefaultWorkFile 方法 - 此方法需要知道您将文件输出到哪里在 HDFS 中,并且您没有在 setStoreLocation 方法中调用 FileOutputFormat.setOutputPath(job, new Path(location));(请参阅我之前粘贴的 PigStorage.setStoreLocation 方法)。所以错误是因为它不知道在哪里创建默认工作文件。

【讨论】:

感谢克里斯。我错过了“FileOutputFormat.setOutputPath(job, new Path(location));”称呼。根据您的输入更改了我的代码。

以上是关于未设置 Hadoop Pig 输出目录的主要内容,如果未能解决你的问题,请参考以下文章

hadoop/pig导入日志的多级目录

运行 pig 0.7.0 错误:错误 2998:未处理的内部错误

Hadoop Pig 从加载 glob 获取子目录名称并在存储中使用它

Hadoop Pig 自定义键名

Pig 0.13 错误 2998:未处理的内部错误。 org/apache/hadoop/mapreduce/task/JobContextImpl

无法在 PIG 中转储关系