如何将不同的数据集传递给同一作业的两个不同的映射器

Posted

技术标签:

【中文标题】如何将不同的数据集传递给同一作业的两个不同的映射器【英文标题】:How to pass different set of data to two different mappers of the same job 【发布时间】:2015-09-23 10:35:30 【问题描述】:

我有一个 Single Mapper ,比如 SingleGroupIdentifierMapper.java

现在这是一个通用映射器,它根据使用缓存从驱​​动程序类传递给它的属性文件(包含过滤器和键值字段索引)对单行映射器输入值/记录进行所有过滤。

只有 reducer 业务逻辑不同,并且已经实现了映射器逻辑的通用性,并使用上面提到的 PropertyFile 实现。

现在我的问题陈述是我现在有来自多个来源的输入,具有不同的格式。这意味着我必须做一些事情,比如

 MultipleInputs.addInputPath(conf, new Path("/inputA"),TextInputFormat.class, SingleGroupIdentifierMapper.class);
MultipleInputs.addInputPath(conf, new Path("/inputB"),TextInputFormat.class, SingleGroupIdentifierMapper.class);

但是我从驱动程序类传递给映射器以实现基于字段索引的过滤器的缓存属性文件是常见的,那么我如何将两个不同的属性文件传递给同一个映射器,如果它在哪里处理,比如输入 A ,然后它将使用 PropertyFileA(过滤和创建键值对),如果它处理,比如 Input B,那么它将使用 PropertyFileB(过滤和创建键值对)。

可以更改 Mapper 的通用代码来处理这种情况,但是如何在通用类中解决这个问题,以及如果输入来自 inputA/inputB,如何在同一个 Mapper 类中识别并相应地应用数据上的propertyFile配置。

我们可以将参数传递给这个映射器类的构造函数,以指定它来自 inputB 还是需要读取缓存中的哪个属性文件?

例如:

MultipleInputs.addInputPath(conf, new Path("/inputB"),TextInputFormat.class, args[], SingleGroupIdentifierMapper.class);

其中 args[] 被传递给 SingleGroupIdentifierMapper 类的构造函数,我们将其定义为输入并将其设置为属性。

欢迎任何想法或专业知识。

希望我能够清楚地表达我的问题,如果问题需要更清楚,请询问我。

提前致谢, 干杯:)

【问题讨论】:

因此您将拥有 n 个输入文件路径和 n 个缓存的属性文件。您打算将它们全部作为部分 jar 命令发送? 我将它们作为 jobConf.setString(configFileKey,JsonStringOfConfigFile) 发送,其中 JsonStringOfConfigFile 只不过是配置文件的键值对的 JsonString,我在 Common-Mapper 中将其转换为对象,获取多个输入来自不同的文件夹 是否可以在我的 Mapper 中获取输入文件路径?! 【参考方案1】:

不幸的是,MultipleInputs 不是那么灵活。但是有一种解决方法可以将 InputSplit 路径与 Mapper 的 setup 方法中的属性文件匹配。如果您没有使用任何类型的 Combine*Format,那么单个映射器将处理来自单个文件的单个拆分:

    将 prop 文件添加到缓存时,请使用 /propfile_1#PROPS_A 和 /propfile_2#PROPS_B 在 job.getConfiguration().set("PROPS_A", "/inputA") 和 job.getConfiguration().set("PROPS_B", "/inputB") 中添加输入路径 在 Mapper.setup(Context context) 方法中,使用 context.getInputSplit().toString() 获取拆分的路径。将其与保存在 context.getConfiguration().get("PROPS_A") 或 PROPS_B 中的路径匹配

如果您使用的是一些 Combine*Format,则需要对其进行扩展,覆盖使用来自 JobContext 的信息的 getSplits 来构建 PathFilter[] 并调用 createPool,这将创建包含来自同一组的文件的拆分 (输入 A 或输入 B)。

【讨论】:

以上是关于如何将不同的数据集传递给同一作业的两个不同的映射器的主要内容,如果未能解决你的问题,请参考以下文章

具有单个映射器和两个不同减速器的 hadoop 作业

使用 MapReduce/Hadoop 对大数据进行排序

将文件从 AWS EMR 集群中的映射器上传到 S3

如何在MR作业中配置映射以批量执行?

根据映射器代码中的某些逻辑,将映射器中的一些数据(行)写入单独的目录

如何创建从实体到 dto 的映射器,其中 dto 嵌套在哪里?