用于日志分析的 Map Reduce 作业未在 Hadoop 2.7.3 伪分布式模式下运行

Posted

技术标签:

【中文标题】用于日志分析的 Map Reduce 作业未在 Hadoop 2.7.3 伪分布式模式下运行【英文标题】:Map Reduce job for log analysis not running in Hadoop 2.7.3 pseudo distributed mode 【发布时间】:2019-02-04 11:55:53 【问题描述】:

我是大数据领域的新手,并分配了一个 POC 来处理 Web 应用程序生成的日志。我已经成功地在 linux VM 上以伪分布式模式设置了 hadoop,并设法使用 flume 将 web 服务器日志从 windows 服务器注入到 hdfs。接下来,我编写了一个用于日志分析的 mapreduce 程序,在 eclipse 中运行良好。但是当我导出 jar 并将其移动到 hadoop VM 时,作业成功完成,在 hdfs 中创建的输出目录但 part-* 文件为空。我已经通过在本地 Eclipse 上测试代码来验证我的输入数据集。我尝试了远程应用程序调试,main 方法中的断点命中但 map 方法中的断点没有。如有任何帮助,我们将不胜感激。

我已经在互联网上进行了足够多的搜索,但找不到类似的东西。 下面是我的代码

public class OneLoadTransactionsSuccessCount 

    public static class OneLoadLogMapper extends Mapper<Object, Text, Text, IntWritable> 

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private static final Logger logger = Logger.getLogger(OneLoadLogMapper.class);
    private final static String SUCCESS_CODE = "0";

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        SimpleDateFormat sdf = new SimpleDateFormat("hh:mm dd MMM, yyyy");
        String text = value.toString();
        logger.info("text:::: " + text);
        System.out.println("text:::: " + text);
        int startingTag = text.indexOf("[#");
        int endingTag = text.indexOf("#]");
        if (startingTag != -1 && endingTag != -1) 
        try 

            String completeLog = text.substring(startingTag + 1, endingTag);
            logger.info("completeLog:::: " + completeLog);
            System.out.println("completeLog:::: " + completeLog);
            String[] tokens = completeLog.split("\\|");
            if (tokens != null && tokens.length > 0) 
            logger.info(tokens[1]);
            System.out.println(tokens[1]);
            if (tokens[6] != null) 
                String responseXML = tokens[6];
                logger.info("responseXML:::: " + responseXML);
                System.out.println("responseXML:::: " + responseXML);
                JAXBContext jaxbContext = JAXBContext.newInstance(LoadResponseMsg.class);
                Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();

                StringReader reader = new StringReader(responseXML);

                InputStream inputStream = new ByteArrayInputStream(
                    responseXML.getBytes(Charset.forName("UTF-8")));

                TransformerFactory factory = TransformerFactory.newInstance();
                Source xslt = new StreamSource(new File("removeNs.xslt"));
                Transformer transformer = factory.newTransformer(xslt);

                Source src = new StreamSource(inputStream);
                transformer.transform(src, new StreamResult(new File("tempoutput.xml")));

                File responseXMLFile = new File("tempoutput.xml");



                jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() 

                @Override
                public boolean handleEvent(ValidationEvent event) 
                    throw new RuntimeException(event.getMessage(), event.getLinkedException());
                
                );

                LoadResponseMsg loadResponseMsg = (LoadResponseMsg) jaxbUnmarshaller
                    .unmarshal(responseXMLFile);
                if (loadResponseMsg != null) 
                logger.info("reader:::: " + reader.toString());
                System.out.println("reader:::: " + reader.toString());
                if (loadResponseMsg.getResponseHeader().getResponseCode()
                    .equalsIgnoreCase(SUCCESS_CODE)) 
                    logger.info("status::: " + loadResponseMsg.getLoadResponse().getDescription());
                    System.out
                        .println("status::: " + loadResponseMsg.getLoadResponse().getDescription());
                    word.set(loadResponseMsg.getLoadResponse().getCompanyShortName());
                    context.write(word, one);
                
                

            

            
         catch (JAXBException e) 
            logger.error(e);
         catch (IOException e) 
            logger.error(e);
         catch (Exception e) 
            logger.error(e);
        
        
    
    

    public static class OneLoadLogReducer extends Reducer<Text, IntWritable, Text, IntWritable> 
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException 
        int sum = 0;
        for (IntWritable val : values) 
        sum += val.get();
        
        result.set(sum);
        context.write(key, result);
    
    

    public static void main(String[] args) throws Exception 
    try 

        Configuration conf = new Configuration();
        System.out.println("in main method");
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(OneLoadTransactionsSuccessCount.class);
        job.setMapperClass(OneLoadLogMapper.class);
        job.setCombinerClass(OneLoadLogReducer.class);
        job.setReducerClass(OneLoadLogReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.out.println("about to run the job");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
     catch (Exception e) 
        System.err.println(e);
        e.printStackTrace();
    

    

【问题讨论】:

【参考方案1】:

因此涉及到 IO 操作。我在制作 jar 时将所需的文件添加到类路径中,但我的开发环境是 windows,我在 Linux 上运行它。问题是因为不同的文件系统。程序在 Linux 中寻找错误位置的文件。

【讨论】:

以上是关于用于日志分析的 Map Reduce 作业未在 Hadoop 2.7.3 伪分布式模式下运行的主要内容,如果未能解决你的问题,请参考以下文章

关于在eclipse上能运行Map但无法运行Reduce的解决方法

您如何在 map/reduce 中实现排名和排序?

在编写 Map/Reduce 作业以找到平均值时需要帮助

MapReduce 作业继续以 map = 0%、reduce = 0% 运行数小时

day25 map,filter,reduce 内置函数,作业

Map-Reduce作业无法提供预期的分区文件