风暴拓扑不起作用
Posted
技术标签:
【中文标题】风暴拓扑不起作用【英文标题】:Storm Topology not working 【发布时间】:2014-10-25 19:51:46 【问题描述】:我是storm新手,但我仍然在本地机器上配置了storm。我做了一个 Eclipse 项目,并从互联网上遵循了一个简单的例子。现在我的拓扑正在提交,但它不起作用。
是否提交了拓扑? 是的,它已成功提交,因为我可以在 Storm ui 上看到它。
我的拓扑结构的工作是打印一个数字,如果它是一个素数。但它没有打印它。
我提供的代码如下:
Spout 类:
public class NumberSpout extends BaseRichSpout
private SpoutOutputCollector collector;
private static final Logger LOGGER = Logger.getLogger(SpoutOutputCollector.class);
private static int currentNumber = 1;
@Override
public void open( Map conf, TopologyContext context, SpoutOutputCollector collector )
this.collector = collector;
@Override
public void nextTuple()
// Emit the next number
LOGGER.info("Coming in spout tuble method");
collector.emit( new Values( new Integer( currentNumber++ ) ) );
@Override
public void ack(Object id)
@Override
public void fail(Object id)
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare( new Fields( "number" ) );
螺栓类:
public class PrimeNumberBolt extends BaseRichBolt
private static final Logger LOGGER = Logger.getLogger(PrimeNumberBolt.class);
private OutputCollector collector;
public void prepare( Map conf, TopologyContext context, OutputCollector collector )
this.collector = collector;
public void execute( Tuple tuple )
int number = tuple.getInteger( 0 );
if( isPrime( number) )
LOGGER.info("Prime number printed is: )" +number);
System.out.println( number );
collector.ack( tuple );
public void declareOutputFields( OutputFieldsDeclarer declarer )
declarer.declare( new Fields( "number" ) );
private boolean isPrime( int n )
if( n == 1 || n == 2 || n == 3 )
return true;
// Is n an even number?
if( n % 2 == 0 )
return false;
//if not, then just check the odds
for( int i=3; i*i<=n; i+=2 )
if( n % i == 0)
return false;
return true;
拓扑类:
public class PrimeNumberTopology
public static void main(String[] args)
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout", new NumberSpout(),1 );
builder.setBolt( "prime", new PrimeNumberBolt(),1 )
.shuffleGrouping("spout");
Config conf = new Config();
conf.put(Config.NIMBUS_HOST, "127.0.0.1");
conf.setDebug(true);
Map storm_conf = Utils.readStormConfig();
storm_conf.put("nimbus.host", "127.0.0.1");
Client client = NimbusClient.getConfiguredClient(storm_conf)
.getClient();
String inputJar = "/home/jamil/Downloads/storm-twitter-word-count-master/target/storm-test-1.0-SNAPSHOT.jar";
NimbusClient nimbus = new NimbusClient("127.0.0.1",6627);
// upload topology jar to Cluster using StormSubmitter
String uploadedJarLocation = StormSubmitter.submitJar(storm_conf,
inputJar);
try
String jsonConf = JSONValue.toJSONString(storm_conf);
nimbus.getClient().submitTopology("newtesttopology",
uploadedJarLocation, jsonConf, builder.createTopology());
catch (AlreadyAliveException ae)
ae.printStackTrace();
catch (InvalidTopologyException e)
// TODO Auto-generated catch block
e.printStackTrace();
catch (TException e)
// TODO Auto-generated catch block
e.printStackTrace();
现在我想问为什么它不打印?或者为什么不将其写入日志文件?
PLUS:我正在从 Eclipse 提交拓扑。
【问题讨论】:
您希望在哪里打印?系统打印永远不会重定向到日志文件。 我也在使用 log4j 记录器记录它们。 那你是怎么配置log4j的呢? 我没有为这个项目配置 log4j。 我也在项目中放置了 log4j 属性文件。但它仍然没有记录。 【参考方案1】:除了 @Thomas Jungblut
所说的(关于您的 log4j 配置)并假设这是您的拓扑的完整源代码,然后看看您的 spout 的 nextTuple()
方法。
您的 spout 只是发出一个值,仅此而已。您很有可能在控制台中错过了该发射的输出,因为它隐藏在大量其他日志输出之下。
您确定只想发出一个值吗?
【讨论】:
以上是关于风暴拓扑不起作用的主要内容,如果未能解决你的问题,请参考以下文章