在 oozie 工作流中读取 avro 数据文件时出错 - 类与新地图 API 模式不兼容

Posted

技术标签:

【中文标题】在 oozie 工作流中读取 avro 数据文件时出错 - 类与新地图 API 模式不兼容【英文标题】:Error while reading avro data file in oozie workflow - class is incompatible with new map API mode 【发布时间】:2014-08-07 10:07:04 【问题描述】:

我正在尝试使用 avro 数据文件作为输入和输出从 oozie 工作流运行 MR 作业。 Mapper 发出 Text 和 IntWritable。我正在使用一个新的 mr api - mapreduce。我的工作流程定义如下:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="map-reduce-wf">
<global>
   <job-tracker>$jobTracker</job-tracker>
   <name-node>$nameNode</name-node>
   <configuration>
        <property>
            <name>mapreduce.job.queuename</name>
            <value>$queueName</value>
        </property>
    </configuration>
</global>

    <start to="mr-node"/>

    <action name="mr-node">
        <map-reduce>
            <prepare>
                <delete path="$nameNode/$outputDir"/>
            </prepare>
            <configuration>
                <!-- BEGIN: SNIPPET TO ADD IN ORDER TO MAKE USE OF NEW HADOOP API -->
                <property>
                  <name>mapred.reducer.new-api</name>
                  <value>true</value>
                </property>
                <property>
                  <name>mapred.mapper.new-api</name>
                  <value>true</value>
                </property>
                <!-- END: SNIPPET -->
                <property>
                    <name>mapreduce.map.class</name>
                    <value>com.ncr.bigdata.mr.avro.AvroPifDriver$PifMapper</value>
                </property>
                <property>
                    <name>mapreduce.reduce.class</name>
                    <value>com.ncr.bigdata.mr.avro.AvroPifDriver$PifReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>$nameNode/$inputDir</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>$nameNode/$outputDir</value>
                </property>                 
                <property>
                    <name>mapred.input.format.class</name>
                    <value>org.apache.avro.mapreduce.AvroKeyInputFormat</value>
                </property>
                <property>
                    <name>avro.schema.input.key</name>
                    <value>"type":"record","name":"SampleRecord","namespace":"org.co.sample.etl.domain","fields":["name":"requiredName","type":"string","name":"optionalName","type":["null","string"],"name":"dataItemLong","type":"long","name":"dataItemInt","type":"int","name":"startTime","type":"long","name":"endTime","type":"long"]</value>
                </property>


                <property>
                    <name>mapred.output.format.class</name>
                    <value>org.apache.avro.mapreduce.AvroKeyValueOutputFormat</value>
                </property> 
                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.avro.mapred.AvroKey</value>
                </property>   
                <property>
                    <name>mapred.output.value.class</name>
                    <value>org.apache.avro.mapred.AvroValue</value>
                </property> 

                <property>
                    <name>avro.schema.output.key</name>
                    <value>string</value>
                </property> 
                <property>
                    <name>avro.schema.output.value</name>
                    <value>int</value>
                </property> 

            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[$wf:errorMessage(wf:lastErrorNode())]</message>
    </kill>
    <end name="end"/>
</workflow-app>

我的映射器如下所示:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

 public static class PifMapper extends Mapper<AvroKey<PosData>, NullWritable, Text, IntWritable> 

        @Override
        public void map(AvroKey<PosData> key, NullWritable value, Context context)
                throws IOException, InterruptedException 
        ...
        

我收到以下错误:

140807041959771-oozie-oozi-W@mr-node] Launcher exception: mapred.input.format.class is incompatible with new map API mode.
java.io.IOException: mapred.input.format.class is incompatible with new map API mode.
    at org.apache.hadoop.mapreduce.Job.ensureNotSet(Job.java:1172)
    at org.apache.hadoop.mapreduce.Job.setUseNewAPI(Job.java:1198)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1261)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
...

我正在使用 Hadoop 2.2.0 (HDP 2.0)、Oozie 4.0.0、Avro 1.7.4

通过驱动程序类提交的 Map reduce 作业可以正常工作。 org.apache.avro.mapreduce.AvroKeyInputFormat 也应该是新 mapreduce 的实现。

为了确保没有库冲突,我从 ozzie 中删除了共享库,并且所有库都包含在工作流库目录中。

有什么提示吗?

【问题讨论】:

【参考方案1】:

只需为作业配置找到正确的属性即可。过时的文档有点误导。以下为我们工作:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="map-reduce-wf">
<global>
   <job-tracker>$jobTracker</job-tracker>
   <name-node>$nameNode</name-node>
   <configuration>
        <property>
            <name>mapreduce.job.queuename</name>
            <value>$queueName</value>
        </property>
    </configuration>
</global>

    <start to="mr-node"/>

    <action name="mr-node">
        <map-reduce>
            <prepare>
                <delete path="$nameNode/$outputDir"/>
            </prepare>
            <configuration>
                <!-- BEGIN: SNIPPET TO ADD IN ORDER TO MAKE USE OF NEW HADOOP API -->
                <property>
                  <name>mapred.reducer.new-api</name>
                  <value>true</value>
                </property>
                <property>
                  <name>mapred.mapper.new-api</name>
                  <value>true</value>
                </property>
                <!-- END: SNIPPET -->
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>com.ncr.bigdata.mr.avro.AvroPifDriver$PifMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>com.ncr.bigdata.mr.avro.AvroPifDriver$PifReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>$nameNode/$inputDir</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>$nameNode/$outputDir</value>
                </property>                 
                <property>
                    <name>mapreduce.job.inputformat.class</name>
                    <value>org.apache.avro.mapreduce.AvroKeyInputFormat</value>
                </property>

                <property>
                    <name>avro.schema.input.key</name>
                    <value>"type":"record","name":"SampleRecord","namespace":"org.co.sample.etl.domain","fields":["name":"requiredName","type":"string","name":"optionalName","type":["null","string"],"name":"dataItemLong","type":"long","name":"dataItemInt","type":"int","name":"startTime","type":"long","name":"endTime","type":"long"]</value>
                </property>


                <property>
                    <name>mapreduce.job.outputformat.class</name>
                    <value>org.apache.avro.mapreduce.AvroKeyValueOutputFormat</value>
                </property> 



                <property>
                    <name>mapreduce.map.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>   
                <property>
                    <name>mapreduce.map.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property> 


                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.avro.mapred.AvroKey</value>
                </property>   
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.avro.mapred.AvroValue</value>
                </property> 

                <property>
                    <name>avro.schema.output.key</name>
                    <value>"string"</value>
                </property> 
                <property>
                    <name>avro.schema.output.value</name>
                    <value>"int"</value>
                </property> 

            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[$wf:errorMessage(wf:lastErrorNode())]</message>
    </kill>
    <end name="end"/>
</workflow-app>

【讨论】:

【参考方案2】:

我想这会对你有所帮助:how to execute mapreduce programs in oozie with hadoop 2.2

【讨论】:

以上是关于在 oozie 工作流中读取 avro 数据文件时出错 - 类与新地图 API 模式不兼容的主要内容,如果未能解决你的问题,请参考以下文章

带有 oozie 工作流程的猪

使用 pyspark 在 Jupyter notebook 中读取 avro 文件时遇到问题

在 Cloudera 中外部化 Oozie 工作流的属性

Oozie Pig 动作卡在 PREP 状态,作业处于 RUNNING 状态

在 Oozie 或 pig 运行时重命名文件夹

在 Spark 中读取 Avro 文件