Hadoop[2] 带join的MapReduce
Posted 热爱生活的小熊猫
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop[2] 带join的MapReduce相关的知识,希望对你有一定的参考价值。
Hadoop系列 02-MR里面的join
在上一篇文章中,我通俗地介绍了一下MapReduce到底能解决什么问题,并没有涉及到复杂的理论与细节。在本文中,我同样不会涉及具体的理论(对,我只是想水一篇,但是基础不牢地动山摇啊),本文会举一个比较复杂一点的MapReduce的例子以确保你真的理解了MapReduce。先做个简单的预告,在后面的几篇文章里,我会分别介绍Hadoop Streaming的使用方法,具体的参数配置,MapReduce的工作原理等内容。
这里我想bb几句,关于MR我的切身经验是:在线上跑数据之前一定一定要先确保本地测试无误,MR代码没有bug,如果在线上发现代码有bug或者报错很难受的。
话不多说,进入正文。
先简单介绍一下join的概念 ,在数据库中经常会有连接操作,MR里的join其实本质上和数据库的join差不多,只不过一般需要根据业务内容自行调整,比较灵活。先回到大学数据库课堂,复习一下连接:
假如现在给你两张表,一张student表记录了学生的学号,姓名,班级等信息,一张score表记录了各个学号的考试成绩,现在需要从这两张表中统计出:学号、姓名、成绩,可以写下如下的SQL语句:
SELECT student.id, student.name, score.course, score.score FROM student, score WHERE student.id = score.id
嗯,看起来一切都很顺利,那么接下来就使用MR来实现这个过程。首先给定student信息:
2020001 zhangsan
2020002 lisi
2020003 wangwu
2020004 tom
2020005 jerry
2020006 bob
再给定score信息:
2020001 English 90
2020002 Chinese 100
2020006 Math 72
2020006 History 99
2020003 Policy 88
然后就要编写mapper脚本了,同样的使用python编写mapper.py脚本如下:
import sys
for line in sys.stdin:
data = line.strip().split()
if len(data) == 2:
print "%s\t%s" % (data[0], data[1])
if len(data) == 3:
print "%s\t%s\t%s" % (data[0], data[1], data[2])
这里简单解释一下上面的代码:map的工作内容就是对输入的每一行数据进行拆分,拆分为key/value对,在Hadoop Streaming中会默认使用第一个 \t 字符之前的字符串作为key,使用第一个 \t 字符之后的所有字符串作为value,因此可以看到print的内容都是包含 \t 的(实际使用中只有key也是可以的,value可以为空)。由于输入文件有两种类型:student中的每一行包含两列信息,而score中的每一行包含三列信息,为了使得别人也能够轻松看懂你的代码,因此我建议在这里使用 if 语句做一个区分。
接下来实现reducer.py脚本如下:
import sys
cur_id = None
cur_course = None
cur_score = dict()
data = [None, None, None]
for line in sys.stdin:
data = line.strip().split()
if cur_id != None and cur_course != None and len(cur_score) > 0:
for c in cur_score:
print "%s,%s,%s,%s" % (cur_id, cur_name, c, cur_score[c])
cur_id = data[0]
cur_name = None
cur_score.clear()
if len(data) == 2:
cur_name = data[1]
if len(data) == 3:
cur_score[data[1]] = data[2]
if cur_id == data[0] and cur_id != None and len(cur_score) > 0:
for c in cur_score:
print "%s,%s,%s,%s" % (cur_id, cur_name, c, cur_score[c])
上面的reduce代码也挺好理解的,和上一篇文章中的WordCount的reduce的区别主要在于增加了一个缓存cur_score,这是由于需要汇总来自两个文件中的数据,由于一个学生可能会有多个科目的成绩,因此需要将结果缓存下来。
由于多个输入文件的情况下,并不能确保同一个key的键值对哪个数据会排在前面哪个会排在后面,因此最稳妥的做法就是等当前key的所有value都收集完了再统一输出。但是这里是有优化空间的,在我碰到的问题中,可能这个缓存会占用10G+的内存空间,这开销太大了,因此这里是有一些优化的小trick的,感兴趣的可以自己想想。
好嘚,万事俱备只欠东风了,我们在Linux/Unix中敲下熟悉的测试指令:
cat ./student ./score | python mapper.py | sort | python reducer.py
如果一切正常,应该能够得到如下的结果:
2020001,zhangsan,english,90
2020002,lisi,chinese,100
2020003,wangwu,policy,88
2020006,bob,math,72
2020006,bob,history,99
好嘚,到这里本文就可以圆满结束了,在这之前我们回归理论。在这个例子里面的连接操作是在reduce中进行的,因此也叫做reduce端的join,这也是比较常用的一种;除此之外还有另外一种在map里的join,其应用场景是:有一个比较小的表(文件)能够在map的内存中存下,因此每当从较大的表中读取数据之后,就可以直接在map中进行join了。像更复杂的场景还包括:在map和reduce中均进行多次join,从更多个输入文件中进行多次join等等,都需要结合具体场景了。
写纯干货文章很累的,点个在看呗❀
以上是关于Hadoop[2] 带join的MapReduce的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop.2.x_高级应用_二次排序及MapReduce端join
大数据之Hadoop(MapReduce):Map Join
MapReduce实现两表的Join--原理及python和java代码实现
Hadoop3 - MapReduce Join 关联注意点