如何使用Python为Hadoop编写一个简单的MapReduce程序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Python为Hadoop编写一个简单的MapReduce程序相关的知识,希望对你有一定的参考价值。

参考技术A MichaelG.Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序。  首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二安装部署)。HadoopStreaming帮助我们用非Java的编程语言使用MapReduce,Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。  我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按"1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。    PythonCode  Map:mapper.py  #!/usr/bin/envpythonimportsys#mapswordstotheircountsword2count=#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#splitthelineintowordswhileremovinganyemptystringswords=filter(lambdaword:word,line.split())#increasecountersforwordinwords:#writetheresultstoSTDOUT(standardoutput);#whatweoutputherewillbetheinputforthe#Reducestep,i.e.theinputforreducer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)  Reduce:reducer.py  #!/usr/bin/envpythonfromoperatorimportitemgetterimportsys#mapswordstotheircountsword2count=#inputcomesfromSTDINforlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisNOTrequired,wejustdoitsothatour#finaloutputwilllookmoreliketheofficialHadoop#wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)  CCode  Map:Mapper.c  #include#include#include#include#defineBUF_SIZE2048#defineDELIM"\n"intmain(intargc,char*argv[])charbuffer[BUF_SIZE];while(fgets(buffer,BUF_SIZE-1,stdin))intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=NULL;if(querys==NULL)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query)printf("%s\t1\n",query);query=strtok(NULL,"");return0;h>h>h>h>  Reduce:Reducer.c  #include#include#include#include#defineBUFFER_SIZE1024#defineDELIM"\t"intmain(intargc,char*argv[])charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];intcount=0;*strLastKey='\0';*strLine='\0';while(fgets(strLine,BUFFER_SIZE-1,stdin))char*strCurrKey=NULL;char*strCurrNum=NULL;strCurrKey=strtok(strLine,DELIM);strCurrNum=strtok(NULL,DELIM);/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0')strcpy(strLastKey,strCurrKey);if(strcmp(strCurrKey,strLastKey))printf("%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);elsecount+=atoi(strCurrNum);strcpy(strLastKey,strCurrKey);printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;h>h>h>h>  首先我们调试一下源码:  chmod+xmapper.pychmod+xreducer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./reducer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Reducer.c-oReducerchmod+xMapperchmod+xReducerecho"foofooquuxlabsfoobarquux"|./Mapper|./Reducerbar1foo2labs1quux1foo1quux1  你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.  在Hadoop中运行程序  首先我们要下载我们的测试文档wget页面中摘下的用php编写的MapReduce程序,供php程序员参考:Map:mapper.php  #!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false)//removeleadingandtrailingwhitespaceandlowercase$line=strtolower(trim($line));//splitthelineintowordswhileremovinganyemptystring$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word)$word2count[$word]+=1;//writetheresultstoSTDOUT(standardoutput)//whatweoutputherewillbetheinputforthe//Reducestep,i.e.theinputforreducer.pyforeach($word2countas$word=>$count)//tab-delimitedecho$word,chr(9),$count,PHP_EOL;?>  Reduce:mapper.php  #!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false)//removeleadingandtrailingwhitespace$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour//finaloutputwilllookmoreliketheofficialHadoop//wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count)echo$word,chr(9),$count,PHP_EOL;?>  作者:马士华发表于:2008-03-05

如何在python中为Hadoop Map Reduce作业编写组合器和分区器?我如何在Hadoop Job中调用它

如何在python中编写组合器和分区器作业并使用Hadoop Streaming调用它。

答案

请看看Pydoop。我没有探讨过这个,但根据文档,

Pydoop Script使您可以在几行代码中使用mapper和reducer函数为Had​​oop编写简单的MapReduce程序。当Pydoop Script不够用时,您可以切换到更完整的Pydoop API,它提供了实现Python Partitioner,RecordReader和RecordWriter的能力。 Pydoop可能不是所有Hadoop用例的最佳API,但其独特的功能使其适用于特定场景,并且正在积极改进。

Here是基于Python的hadoop组合器的SO问题。

附加参考

Reference Link

GitHub Link

另外this link详细介绍了各种其他可用的hadoop-python框架。

另一答案

你可以使用Yelp的MRJob。它很简单,并且有很好的文档,而且我自己也使用它 - 使用与hadoop的Java库相同的接口。是的,它使用的是hadoop流媒体 - 性能可能就是这样。但是,遗憾的是,您仍然需要在Java上编写分区程序。

以上是关于如何使用Python为Hadoop编写一个简单的MapReduce程序的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Python为Hadoop编写一个简单的MapReduce程序

如何使用Python为Hadoop编写一个简单的MapReduce程序

如何在python中为Hadoop Map Reduce作业编写组合器和分区器?我如何在Hadoop Job中调用它

如何使用 hbase 作为 hadoop 流作业的来源

如何快速地编写和运行一个属于自己的MapReduce例子程序

在 Hadoop 上部署 Python pip 包?