在 Oozie 工作流中的 MapReduce 作业中设置 Reducer 的数量

Posted

技术标签:

【中文标题】在 Oozie 工作流中的 MapReduce 作业中设置 Reducer 的数量【英文标题】:Setting the Number of Reducers in a MapReduce job which is in an Oozie Workflow 【发布时间】:2014-02-13 02:26:42 【问题描述】:

我有一个五节点集群,其中三个节点包含 DataNodes 和 TaskTracker。

我通过 Sqoop 从 Oracle 导入了大约 1000 万行,并在 Oozie 工作流中通过 MapReduce 对其进行处理。

MapReduce 作业大约需要 30 分钟,并且只使用了一个 reducer。

编辑 - 如果我单独运行 MapReduce 代码,与 Oozie 分开,job.setNumReduceTasks(4) 会正确建立 4 个减速器。

我尝试了以下方法手动将reducer的数量设置为四个,没有成功:

在Oozie中,在map reduce节点的tag中设置如下属性:

<property><name>mapred.reduce.tasks</name><value>4</value></property>

在 MapReduce java 代码的 Main 方法中:

Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
job.setNumReduceTasks(4);

我也试过了:

Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
conf.set("mapred.reduce.tasks", "4");

我的地图功能看起来像这样:

public void map(Text key, Text value, Context context) 
    CustomObj customObj = new CustomObj(key.toString());
    context.write(new Text(customObj.getId()), customObj);  

我认为 ID 有大约 80,000 个不同的值。

我的 Reduce 函数如下所示:

public void reduce(Text key, Iterable<CustomObj> vals, Context context) 
    OtherCustomObj otherCustomObj = new OtherCustomObj();
    ...
    context.write(null, otherCustomObj);

Mapper 中发出的自定义对象实现了 WritableComparable,但 Reducer 中发出的其他自定义对象没有实现 WritableComparable。

这里是有关系统计数器、作业计数器和 map-reduce 框架的日志,其中指定仅启动了一个 reduce 任务。

 map 100% reduce 100%
 Job complete: job_201401131546_0425
 Counters: 32
   File System Counters
     FILE: Number of bytes read=1370377216
     FILE: Number of bytes written=2057213222
     FILE: Number of read operations=0
     FILE: Number of large read operations=0
     FILE: Number of write operations=0
     HDFS: Number of bytes read=556345690
     HDFS: Number of bytes written=166938092
     HDFS: Number of read operations=18
     HDFS: Number of large read operations=0
     HDFS: Number of write operations=1
   Job Counters 
     Launched map tasks=11
     Launched reduce tasks=1
     Data-local map tasks=11
     Total time spent by all maps in occupied slots (ms)=1268296
     Total time spent by all reduces in occupied slots (ms)=709774
     Total time spent by all maps waiting after reserving slots (ms)=0
     Total time spent by all reduces waiting after reserving slots (ms)=0
   Map-Reduce Framework
     Map input records=9440000
     Map output records=9440000
     Map output bytes=666308476
     Input split bytes=1422
     Combine input records=0
     Combine output records=0
     Reduce input groups=80000
     Reduce shuffle bytes=685188530
     Reduce input records=9440000
     Reduce output records=2612760
     Spilled Records=28320000
     CPU time spent (ms)=1849500
     Physical memory (bytes) snapshot=3581157376
     Virtual memory (bytes) snapshot=15008251904
     Total committed heap usage (bytes)=2848063488

编辑:我修改了 MapReduce 以引入自定义分区器、排序比较器和分组比较器。出于某种原因,代码现在启动了两个 reducer(当通过 Oozie 调度时),而不是四个。

我在每个 TaskTracker(和 JobTracker)上将 mapred.tasktracker.map.tasks.maximum 属性设置为 20,重新启动它们但没有结果。

【问题讨论】:

手动将自定义分区器设置为4,在实现方法中根据某些条件将ID分为4部分。这只是为了测试是否有 4 个 partition/reducer 正在执行。 您使用的 Hadoop 版本是什么?检查您用于设置 reducer 的属性是否对该版本有效 【参考方案1】:

作为起点,mapred-site.xml 中以下属性的值是什么

<property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>4</value>
</property>

【讨论】:

我的集群中每个节点上的 mapred-site.xml 都没有设置。 那么你可能会得到 2 个或 1 个减速器(取决于该版本的默认减速器#)。考虑使用 rsync 将 mapred-site.xml 推送到从节点 好的,在我按照您的指示之前,我自己测试了 MR 代码,并且能够启动 4 个减速器。接下来,我在我的 MapReduce 代码中添加了一个自定义分区器、排序比较器和一个分组比较器,并通过 Oozie 安排了它,它设法将 reducer 的数量增加到 2 个。最后我按照你的说明,rsync 并验证每个 TaskTracker (和 JobTracker)的 mapred.tasktracker.map.tasks.maximum 为 20,但 oozie 工作流仍然只启动两个减速器。 看起来你已经为 oozie 添加了正确的设置,所以目前我没有其他建议。

以上是关于在 Oozie 工作流中的 MapReduce 作业中设置 Reducer 的数量的主要内容,如果未能解决你的问题,请参考以下文章

Oozie 触发 MapReduce 主类

MapReduce 工作流基准

Hadoop Oozie MapReduce 操作自定义分区器

如何使用 hadoop-2.2 在 oozie 中执行 mapreduce 程序

工作流引擎Oozie:workflow

Oozie&Azkaban区别