Hadoop 分布式缓存 (Cloudera CH3)

Posted

技术标签:

【中文标题】Hadoop 分布式缓存 (Cloudera CH3)【英文标题】:Hadoop Distributed Cache (Cloudera CH3) 【发布时间】:2010-12-29 15:12:11 【问题描述】:

我正在尝试使用二进制可执行文件和 缓存的存档,它似乎没有工作:

我尝试运行的示例有一个映射器,它生成三个 随机双打和一个键,reducer 将平均这三个 数字在一起并记录平均值。很简单的东西。我写了一个 c 中的简单 EXE 会生成随机数:

#include <cstdio>
#include <stdlib.h>
#include <time.h> 

int main(int argc, char*argv[]) 
    srand ( time(NULL) );
    int rand1 = rand() % 10 + 1;
    int rand2 = rand() % 10 + 1;
    int rand3 = rand() % 10 + 1;
    printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
    return 0; 

所以如果我调用 ./a.out [key]

我会看到

key, random1, random2, random3

我使用 python 流,这是我用 python 编写的映射器:


#!/usr/bin/python

import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE

from misc import *

for line in sys.stdin:
    line = line.strip()
    sline = line.split(',')

    # parse out the au and cost from the line ....
    key = int( sline[0] )
    au = int( sline[1])
    cost = float( sline[2] )

    command = "./a.out %d" % ( key )
    cli_parts = shlex.split(command)
    mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
    print mp.communicate()[0].strip('\r\n')

这里是只做平均的reducer:


#!/usr/bin/python

import os
import sys
import math
import re

from misc import *

for line in sys.stdin:
    line = line.strip()
    m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
    if m:
        average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
        print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
    else:
        print "not found"

#f.close()

所以在阅读文档后,我似乎需要编译 二进制文件和 tar.gz-it

1) tar cvaf a.out.tar.gz a.out

现在我应该能够通过 - cacheArchive 参数,一切都应该正常工作。这是我的 Hadoop 命令:

hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop- 流式传输-0.20.2+737.jar \ -numReduceTasks 1 \ -映射器映射器1.py \ -文件映射器1.py \ -reducer reducer1.py \ -文件reducer1.py \ -文件 misc.py \ -cacheArchive a.out.tar.gz \ -输入输入/* \ -输出测试vm输出\ -详细

不用说,这不起作用,似乎是因为 mapper 没有生成数据。

我通过在命令行上测试确认我的代码可以正常工作:

猫输入/svminput1.txt | python mapper1.py |排序 | Python reducer1.py

我很想有人解释为什么这不起作用,如何 通过 cacheArchive 命令传递 exe 对数据节点起作用, 和/或如何调试它,因为错误消息来自 Cloudera html 面板没有那么有用。

谢谢

这是我看到的错误:

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)

【问题讨论】:

【参考方案1】:

我看到你做错了一些事情。你想设置你的 python 脚本 chmod a+x 并像这样进行测试 cat input/svminput1.txt | ./mapper1.py |排序 | ./reducer1.py 因为这基本上是 Hadoop 在流式传输中所做的就是启动脚本(操作系统使用正确的解释器处理执行脚本)

现在对于移动到作业中以与您的映射器和减速器一起使用的其他文件,您只需通过命令行将它们添加到您想要的文件中(就像您使用 misc.py 一样),当您的映射/缩减启动时,这些文件是当地的 ”。”到你的脚本,所以导入并使用它们或你想要的任何东西(打开一个文本文件,无论你想要什么)......你应该用 chacheArchive 东西来做这件事,也只需将它们每个都推送为 -file 应该没问题。

如果你还没有看过的话,这里是一个非常基本的关于 python 流的文章http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

这是一个更高级的 python 流,带有连接和键 http://allthingshadoop.com/2010/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/,可能也有帮助。

如果我认为不再需要处理特定错误,希望这会有所帮助

【讨论】:

我更改了脚本的权限,但仍未解决问题。我还将cacheArchive更改为文件并直接传递可执行文件,并没有修复它。我真的不明白没有关于使用 cacheArchive 参数和流媒体的教程。我不可能是唯一一个尝试使用它的人。感谢流媒体教程,我过去曾与他们合作过。 我也添加了上面的错误,我应该提到这在独立模式下工作,但是当我尝试将它分布在具有一个数据节点和一个命名节点的集群中时,它会失败 另外,如果我查看以下目录 /mnt/hadoop/mapred/local/taskTracker,我可以确认文件正在进入数据节点,我通过 cacheArchive 发送的文件是在目录中解压【参考方案2】:

您确定集群机器上的/usr/bin/python 提供python 吗?一个好的做法是始终在脚本顶部使用#!/usr/bin/env python...这样它就不会被硬编码。

还要确保检查集群机器上的 python 安装...确保导入工作正常。

您没有在代码中使用任何 try/excepts,因此很难调试问题所在...我建议尝试/排除您的代码并将日志消息打印到众所周知的位置,例如 @ 987654323@....

更多信息你可以查看davidvhill.com/articles....我的实际生产代码在这里被捕获....

【讨论】:

以上是关于Hadoop 分布式缓存 (Cloudera CH3)的主要内容,如果未能解决你的问题,请参考以下文章

《Cloudera hadoop大数据平台实战指南》此书预计2018年12月底上市

《Cloudera hadoop大数据平台实战指南》此书预计2018年12月底上市

Hadoop之Flume

Hadoop之Flume

Hadoop伪分布式环境快速搭建

Hadoop2.X分布式集群部署