Hadoop3 - MapReduce Join 关联注意点

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce Join 关联注意点相关的知识,希望对你有一定的参考价值。

一、MapReduce Join 关联注意点

在使用 MapReduce 处理数据的时候,难免其中部分数据在其他文件中存在,因此就可能出现类似 DB 中的 Join 关联操作。

比如:有以下数据,分别表示,date(日期),county(县),stateId(州ID),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)

28/1/2021 00:00:00	Autauga	1	01001	5554	69
28/1/2021 00:00:00	Piatt	15	17147	1208	18
28/1/2021 00:00:00	Emanuel	11	13107	2283	65
28/1/2021 00:00:00	Clay	18	20027	760	22

其中 stateId(州ID) 又对应另一个文件的 ID

1	Alabama
15	Illinois
11	Georgia
18	Kansas

如果需要得到完整的州对应的数据,就需要进行两个文件的Join 关联。

在 MapReduce 中分为 Map 和 Reduce 两个阶段,两个阶段都可以对数据进行处理,也就可以通过一些技术手段对数据进行 Join 操作。

Reduce 阶段 Join

Reduce 阶段 Join 进行处理是非常容易实现的,可以将多个文件共同的 join 字段作为 KEY,一起发往 Reduce 阶段,在 Reduce 中再进行数据的整合,比如下方的操作:

@Slf4j
public class CovidJoinMapper extends Mapper<LongWritable, Text, Text, CountVO> 

    Text outKey = new Text();
    CountVO outValue = new CountVO();
    String filename = null;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException 
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        filename = inputSplit.getPath().getName();
        log.info("currentFile: ", filename);
    

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split("\\t");
        if (filename.contains("covid_input.txt")) 
            outKey.set(fields[2]);
            outValue.set(fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], Long.parseLong(fields[4]), Long.parseLong(fields[5]), "covid_input");
            context.write(outKey, outValue);
         else 
            outKey.set(fields[0]);
            outValue.set(Integer.parseInt(fields[0]), fields[1], "state");
            context.write(outKey, outValue);
        
    

public class CovidJoinReducer extends Reducer<Text, CountVO, CountVO, NullWritable> 

    NullWritable outValue = NullWritable.get();

    @Override
    protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException 
        List<CountVO> covidList = new ArrayList<>();
        List<CountVO> stateList = new ArrayList<>();
        for (CountVO value : values) 
            if (Objects.equals(value.getType(), "covid_input")) 
                covidList.add(new CountVO(value));
             else 
                stateList.add(new CountVO(value));
            
        
        // 对 covidList 根据 stateId 补充 state
        covidList.stream().filter(Objects::nonNull).peek(vo ->
                vo.setState(stateList.stream()
                        .filter(Objects::nonNull)
                        .filter(s -> Objects.equals(s.getStateId(), vo.getStateId()))
                        .map(CountVO::getState)
                        .findFirst().orElse(null))
        ).forEach(vo -> 
            try 
                context.write(vo, outValue);
             catch (Exception e) 
                e.printStackTrace();
            
        );
    

上面虽然实现了数据的整个,但是 Reduce 端 join最大的问题是整个 join 的工作是在 Reduce 阶段完成的,但是通常情况下MapReduce中 Reduce 的并行度是极小的(默认是1个),这就使得所有的数据都挤压到 Reduce 阶段处理,压力颇大,极易出现数据倾斜现象。

下面继续看如果在 Map 阶段进行数据的 Join:

Map 阶段 Join

Map 阶段 Join 处理,则避免了 shuffle 时候的繁琐,同时 Reduce 阶段的压力也会减小,但是 Map 阶段会根据文件的大小拆分成多个 mapTask 分开并行处理,每个 mapTask 都需要进行数据的 Join 处理,那每个 mapTask 都对被 Join 文件进行完整的 IO 读取也会造成性能低下,不过在 Mapreduce 中有提供分布式缓存机制,可以将指定文件缓存到各个mapTask 运行的机器上,实践操作如下:

在驱动类中声明 job 时,指定缓存文件:

 job.addCacheFile(new URI("/test/input1/state.txt"));

在 Mapper 中,可以读取该文件解析成 Map 使用:

@Slf4j
public class CovidJoinMapper extends Mapper<LongWritable, Text, CountVO, NullWritable> 

    Map<Integer, String> stateMap = new HashMap<>();
    CountVO outKey = new CountVO();
    NullWritable outValue = NullWritable.get();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException 
        //读取缓存文件,直接指定文件名
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("state.txt")));
        String line = null;
        while ((line = br.readLine()) != null) 
            String[] fields = line.toString().split("\\t");
            stateMap.put(Integer.parseInt(fields[0]), fields[1]);
        
    

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split("\\t");
        outKey.set(fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], Long.parseLong(fields[4]), Long.parseLong(fields[5]), "covid_input");
        outKey.setState(stateMap.get(Integer.parseInt(fields[2])));
        context.write(outKey, outValue);
    

这种方式虽然可以提升处理的速度,但也有缺陷,如果缓存文件大的话,势必会造成不必要的内存浪费,这种情况可以考虑引入第三方工具,Redis、HBase 等 NoSql ,将被 Join 的数据存至 NoSql 中,在 Mapper 中查找相关数据进行整合,然后给到 Reduce 聚合处理。

以上是关于Hadoop3 - MapReduce Join 关联注意点的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop3 - MapReduce 并行机制

Hadoop3 - MapReduce 并行机制

Hadoop3 - MapReduce 分区介绍及自定义分区

Hadoop3 - MapReduce 分区介绍及自定义分区

Hadoop3 - MapReduce COVID-19 案例实践

Hadoop3 - MapReduce COVID-19 案例实践