hadoop mapreduce开发实践文件合并(join)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop mapreduce开发实践文件合并(join)相关的知识,希望对你有一定的参考价值。

有两份不同的文件,他们有相同的键,把他们合并成一个文件;

1、文件内容

  • 合并前的文件a.txt和b.txt
$ head a.txt b.txt 
==> a.txt <==
aaa1    hdfs
aaa2    hdfs
aaa3    hdfs
aaa4    hdfs
aaa5    hdfs
aaa6    hdfs
aaa7    hdfs
aaa8    hdfs
aaa9    hdfs
aaa10   hdfs

==> b.txt <==
aaa1    mapreduce
aaa2    mapreduce
aaa3    mapreduce
aaa4    mapreduce
aaa5    mapreduce
aaa6    mapreduce
aaa7    mapreduce
aaa8    mapreduce
aaa9    mapreduce
aaa10   mapreduce

2、思路

1)、首先分别对a.txt和b.txt做一个map标签处理(用于区分a.txt和b.txt上的数据);
2)、mapjoin用于输出做过标签处理的a.txt和b.txt;
3)、用一个reducejoin的程序做类似wordcount处理,相同的key放在一起,把a.txt和b.txt上的value放在key后面输出;

3、实现

3.1、创建目录和上传数据

$ hadoop fs -mkdir /input/join
$ hadoop fs -mkdir /output/join/
$ hadoop fs -put a.txt b.txt /input/join

3.2、 mapperA程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]
        #print wordline

        print "%s\ta\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.3、mapperB程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]

        print "%s\tb\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.4、mapperjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        print line.strip()

if __name__ == "__main__":
    mapper()

3.5、reducerjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def reducer():
    valueA = ‘‘
    for line in sys.stdin:
        wordkey, flag, wordvalue = line.strip().split(‘\t‘)
        if flag == ‘a‘:
            valueA = wordvalue
        elif flag == ‘b‘:
            valueB = wordvalue
            print "%s\t%s\t%s" % (wordkey,valueA,valueB)
            valueA = ‘‘

if __name__ == "__main__":
    reducer()

3.6、run_streaming程序

#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"

INPUT_FILE_PATH_A="/input/join/a.txt"
INPUT_FILE_PATH_B="/input/join/b.txt"

OUTPUT_FILE_PATH_A="/output/join/a"
OUTPUT_FILE_PATH_B="/output/join/b"
OUTPUT_FILE_JOIN_PATH="/output/join/abjoin"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_A
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_B
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_JOIN_PATH

# step1: map a
$HADOOP_CMD jar $STREAM_JAR_PATH                 -input $INPUT_FILE_PATH_A                 -output $OUTPUT_FILE_PATH_A                 -jobconf "mapred.job.name=joinfinemapA"                 -mapper "python mapperA.py"                 -file "./mapperA.py"
# step2: map b
$HADOOP_CMD jar $STREAM_JAR_PATH                 -input $INPUT_FILE_PATH_B                 -output $OUTPUT_FILE_PATH_B                 -jobconf "mapred.job.name=joinfinemapB"                 -mapper "python mapperB.py"                 -file "./mapperB.py"

# step3: join
$HADOOP_CMD jar $STREAM_JAR_PATH                 -input $OUTPUT_FILE_PATH_A,$OUTPUT_FILE_PATH_B                 -output $OUTPUT_FILE_JOIN_PATH                 -mapper "python mapperjoin.py"                 -reducer "python reducerjoin.py"                 -jobconf "mapred.job.name=joinfinemapAB"                 -jobconf "stream.num.map.output.key.fields=2"                 -jobconf "num.key.fields.for.partition=1"                 -file "./reducerjoin.py"                 -file "./mapperjoin.py"

3.7、执行程序

$ ./run_streamingab.sh 

...中间省略...
18/02/05 10:43:13 INFO streaming.StreamJob: Output directory: /output/join/a

...中间省略...

18/02/05 10:43:42 INFO streaming.StreamJob: Output directory: /output/join/b

...中间省略...

18/02/05 10:44:12 INFO streaming.StreamJob: Output directory: /output/join/abjoin

3.8、查看结果

$ hadoop fs -ls /output/join/abjoin
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2018-02-05 10:44 /output/join/abjoin/_SUCCESS
-rw-r--r--   1 hadoop supergroup       6276 2018-02-05 10:44 /output/join/abjoin/part-00000
$ hadoop fs -text /output/join/abjoin/part-00000|head
aaa1    hdfs    mapreduce
aaa10   hdfs    mapreduce
aaa100  hdfs    mapreduce
aaa11   hdfs    mapreduce
aaa12   hdfs    mapreduce
aaa13   hdfs    mapreduce
aaa14   hdfs    mapreduce
aaa15   hdfs    mapreduce
aaa16   hdfs    mapreduce
aaa17   hdfs    mapreduce

4、hadoop streaming 语法参考

以上是关于hadoop mapreduce开发实践文件合并(join)的主要内容,如果未能解决你的问题,请参考以下文章

实验5 MapReduce初级编程实践——编程实现文件合并和去重操作

hadoop mapreduce开发实践之HDFS压缩文件(-cacheArchive)

hadoop mapreduce开发实践之HDFS文件分发by streaming

实验5 MapReduce初级编程实践(Python实现)

hadoop mapreduce开发实践之输出数据压缩

MapReduce编程实践——WordCount运行实例(Python实现)