如何在风暴集群上运行拓扑?我看不到输出日志
Posted
技术标签:
【中文标题】如何在风暴集群上运行拓扑?我看不到输出日志【英文标题】:How to run topology on storm cluster? I cant see the output log 【发布时间】:2014-06-24 16:21:22 【问题描述】:我正在尝试在集群中执行拓扑。我编写了一个拓扑并将其编译到一个 jar 中,然后将其注册到集群中。但由于某种原因,拓扑似乎没有运行。我只想在这里使用 Storm 作为管道。使用 cmd 注册:
./storm jar /tmp/storm_test.jar storm.topology.MyTopology /tmp/bigFile.log
拓扑:
package storm.topology;
import storm.spouts.LineReaderSpout;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import storm.bolts.Bolt;
public class MyTopology
public static long tupleCounter = 0;
public static long endTime = 0;
public static void main(String[] args) throws Exception
Config config = new Config();
config.put("inputFile", args[0]);
config.put(Config.TOPOLOGY_WORKERS, 4);
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", new LineReaderSpout());
builder.setBolt("boltA", new Bolt()).shuffleGrouping("line-reader-spout");
StormSubmitter.submitTopology("mytopology", config, builder.createTopology());
螺栓:
package storm.bolts;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import storm.topology.MyTopology2;
import storm.spouts.LineReaderSpout;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Bolt implements IRichBolt
Integer id;
String name;
static long totalTime;
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
@Override
public void execute(Tuple input)
//String str = input.getString(0);
MyTopology2.tupleCounter++;
if (input.getString(0).contains("END"))
MyTopology.endTime = System.nanoTime();
System.out.println("====================================================");
System.out.println("Number of tuples: " + MyTopology.tupleCounter);
totalTime = MyTopology.endTime - LineReaderSpout.startTime;
double tuplePerSec = MyTopology.tupleCounter / (totalTime / 1000000000d);
System.out.println("Test results: " + NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec) + "tuple/sec");
totalTime = MyTopology2.endTime - LineReaderSpout.star`enter code here`tTime;
System.out.println("Total run time: " + totalTime + " nsec");
System.out.println("====================================================");
PrintWriter writer;
try
writer = new PrintWriter("/tmp/storm_results.log", "UTF-8");
writer.println("Number of tuples: " + MyTopology.tupleCounter);
writer.println("Test results: " +
NumberFormat.getNumberInstance(Locale.US).format(tuplePerSec) + "tuple/sec");
writer.println("Total run time: " + totalTime + " nsec");
writer.println("====================================================");
writer.close();
catch (FileNotFoundException ex)
Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
catch (UnsupportedEncodingException ex)
Logger.getLogger(TestBolt.class.getName()).log(Level.SEVERE, null, ex);
@Override
public void cleanup()
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
// TODO Auto-generated method stub
@Override
public Map<String, Object> getComponentConfiguration()
// TODO Auto-generated method stub
return null;
喷口:
package storm.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class LineReaderSpout implements IRichSpout
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
public static long startTime;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector)
try
this.context = context;
this.fileReader = new FileReader(conf.get("inputFile").toString());
startTime = System.nanoTime();
catch (FileNotFoundException e)
throw new RuntimeException("Error reading file "
+ conf.get("inputFile"));
this.collector = collector;
@Override
public void nextTuple()
if (completed)
String str;
BufferedReader reader = new BufferedReader(fileReader);
try
while ((str = reader.readLine()) != null)
this.collector.emit(new Values(str), str);
catch (Exception e)
throw new RuntimeException("Error reading typle", e);
finally
completed = true;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("line"));
@Override
public void close()
try
fileReader.close();
catch (IOException e)
// TODO Auto-generated catch block
e.printStackTrace();
public boolean isDistributed()
return false;
@Override
public void activate()
// TODO Auto-generated method stub
@Override
public void deactivate()
// TODO Auto-generated method stub
@Override
public void ack(Object msgId)
@Override
public void fail(Object msgId)
@Override
public Map<String, Object> getComponentConfiguration()
return null;
【问题讨论】:
【参考方案1】:您可以使用本地集群并将拓扑提交给它,这样您就不需要在远程机器上设置生产集群,代码如下所示:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
【讨论】:
它适用于本地集群,但由于某种原因不适用于生产集群,我使用 1.6_35 jdk 用 netbeans 编译了 jar 查看此文档Running-topologies-on-a-production-cluster【参考方案2】:在生产集群中,您不会在控制台上看到日志。转到风暴日志目录。在那里你会找到工作日志文件。所有的日志消息都写在那里。此外,在生产集群中,您不会看到每条日志消息。为此,您必须在配置中启用 DEBUG 选项
cfg.put(Config.TOPOLOGY_DEBUG, true);
【讨论】:
以上是关于如何在风暴集群上运行拓扑?我看不到输出日志的主要内容,如果未能解决你的问题,请参考以下文章
以编程方式从风暴螺栓中杀死拓扑时编译错误:对象和包具有相同的名称
在 HDIinsight 集群上运行 spark 作业时如何解决此致命错误?会话 681 意外地达到了“死亡”的最终状态。查看日志: