如何在风暴集群上运行拓扑?我看不到输出日志

Posted

技术标签:

【中文标题】如何在风暴集群上运行拓扑?我看不到输出日志【英文标题】:How to run topology on storm cluster? I cant see the output log 【发布时间】:2014-06-24 16:21:22 【问题描述】:

我正在尝试在集群中执行拓扑。我编写了一个拓扑并将其编译到一个 jar 中,然后将其注册到集群中。但由于某种原因,拓扑似乎没有运行。我只想在这里使用 Storm 作为管道。使用 cmd 注册: ./storm jar /tmp/storm_test.jar storm.topology.MyTopology /tmp/bigFile.log

拓扑:

package storm.topology;

import storm.spouts.LineReaderSpout;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import storm.bolts.Bolt;

public class MyTopology 
    public static long tupleCounter = 0;
    public static long endTime = 0;

    public static void main(String[] args) throws Exception 
        Config config = new Config();
        config.put("inputFile", args[0]);
        config.put(Config.TOPOLOGY_WORKERS, 4);
        config.setDebug(true);
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("line-reader-spout", new LineReaderSpout());

        builder.setBolt("boltA", new Bolt()).shuffleGrouping("line-reader-spout");

        StormSubmitter.submitTopology("mytopology", config, builder.createTopology()); 
    

螺栓:

package storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import storm.topology.MyTopology2;
import storm.spouts.LineReaderSpout;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Bolt implements IRichBolt 
    Integer id;
    String name;
    static long totalTime;
    Map<String, Integer> counters;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) 
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    

    @Override
    public void execute(Tuple input) 
        //String str = input.getString(0);
        MyTopology2.tupleCounter++;
        if (input.getString(0).contains("END")) 
            MyTopology.endTime = System.nanoTime();

            System.out.println("====================================================");
            System.out.println("Number of tuples: " + MyTopology.tupleCounter);

            totalTime = MyTopology.endTime - LineReaderSpout.startTime;
            double tuplePerSec = MyTopology.tupleCounter / (totalTime / 1000000000d);
            System.out.println("Test results: " + NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec) + "tuple/sec");
            totalTime = MyTopology2.endTime - LineReaderSpout.star`enter code here`tTime;
            System.out.println("Total run time: " + totalTime + " nsec");
            System.out.println("====================================================");
            PrintWriter writer;

            try 
                writer = new PrintWriter("/tmp/storm_results.log", "UTF-8");
                writer.println("Number of tuples: " + MyTopology.tupleCounter);
                writer.println("Test results: " + 
NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec) + "tuple/sec");
                writer.println("Total run time: " + totalTime + " nsec");
                writer.println("====================================================");
                writer.close();
             catch (FileNotFoundException ex) 
                Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
             catch (UnsupportedEncodingException ex) 
                Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
            
        
    

    @Override
    public void cleanup() 
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        // TODO Auto-generated method stub
    

    @Override
    public Map<String, Object> getComponentConfiguration() 
        // TODO Auto-generated method stub
        return null;
    

喷口:

package storm.spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class LineReaderSpout implements IRichSpout 
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    private TopologyContext context;
    public static long startTime;

    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) 
        try 
            this.context = context;
            this.fileReader = new FileReader(conf.get("inputFile").toString());
            startTime = System.nanoTime();
         catch (FileNotFoundException e) 
            throw new RuntimeException("Error reading file "
                    + conf.get("inputFile"));
        
        this.collector = collector;
    

    @Override
    public void nextTuple() 
        if (completed) 

        
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try 
            while ((str = reader.readLine()) != null) 
                this.collector.emit(new Values(str), str);
            
         catch (Exception e) 
            throw new RuntimeException("Error reading typle", e);
         finally 
            completed = true;
        
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("line"));
    

    @Override
    public void close() 
        try 
            fileReader.close();
         catch (IOException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        
    

    public boolean isDistributed() 
        return false;
    

    @Override
    public void activate() 
        // TODO Auto-generated method stub
    

    @Override
    public void deactivate() 
        // TODO Auto-generated method stub
    

    @Override
    public void ack(Object msgId) 
    

    @Override
    public void fail(Object msgId) 
    

    @Override
    public Map<String, Object> getComponentConfiguration() 
        return null;
    

【问题讨论】:

【参考方案1】:

您可以使用本地集群并将拓扑提交给它,这样您就不需要在远程机器上设置生产集群,代码如下所示:

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("mytopology", conf, builder.createTopology());

【讨论】:

它适用于本地集群,但由于某种原因不适用于生产集群,我使用 1.6_35 jdk 用 netbeans 编译了 jar 查看此文档Running-topologies-on-a-production-cluster【参考方案2】:

在生产集群中,您不会在控制台上看到日志。转到风暴日志目录。在那里你会找到工作日志文件。所有的日志消息都写在那里。此外,在生产集群中,您不会看到每条日志消息。为此,您必须在配置中启用 DEBUG 选项 cfg.put(Config.TOPOLOGY_DEBUG, true);

【讨论】:

以上是关于如何在风暴集群上运行拓扑?我看不到输出日志的主要内容,如果未能解决你的问题,请参考以下文章

运行10天后,我的风暴拓扑失败了

风暴集群重复元组

风暴拓扑不起作用

以编程方式从风暴螺栓中杀死拓扑时编译错误:对象和包具有相同的名称

在 HDIinsight 集群上运行 spark 作业时如何解决此致命错误?会话 681 意外地达到了“死亡”的最终状态。查看日志:

风暴日志文件/文件目录:它在哪里?