风暴中的 DRPC 服务器错误
Posted
技术标签:
【中文标题】风暴中的 DRPC 服务器错误【英文标题】:DRPC Server error in storm 【发布时间】:2014-12-12 03:58:52 【问题描述】:我正在尝试执行以下代码并收到错误..不确定我是否在这里遗漏了什么..还有我在哪里可以看到输出?
错误
java.lang.RuntimeException:没有为拓扑配置 DRPC 服务器 在 backtype.storm.drpc.DRPCSpout.open(DRPCSpout.java:79) 在storm.trident.spout.RichSpoutBatchTriggerer.open(RichSpoutBatchTriggerer.java:58) 在 backtype.storm.daemon.executor$fn__5802$fn__5817.invoke(executor.clj:519) 在 backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) 在 clojure.lang.AFn.run(AFn.java:24) 在 java.lang.Thread.run(Thread.java:744)
Code:
----
package com.**.trident.storm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.*;
import storm.trident.*;
import backtype.storm.*;
public class EventTridentDrpcTopology
private static final String KAFKA_SPOUT_ID = "kafkaSpout";
private static final Logger log = LoggerFactory.getLogger(EventTridentDrpcTopology.class);
public static StormTopology buildTopology(OpaqueTridentKafkaSpout spout) throws Exception
TridentTopology tridentTopology = new TridentTopology();
TridentState ts = tridentTopology.newStream("event_spout",spout)
.name(KAFKA_SPOUT_ID)
.each(new Fields("mac_address"), new SplitMac(), new Fields("mac"))
.groupBy(new Fields("mac"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("maccount"))
.parallelismHint(4)
;
tridentTopology
.newDRPCStream("mac_count")
.each(new Fields("args"), new SplitMac(), new Fields("mac"))
.stateQuery(ts,new Fields("mac"),new MapGet(), new Fields("maccount"))
.each(new Fields("maccount"), new FilterNull())
.aggregate(new Fields("maccount"), new Sum(), new Fields("sum"))
;
return tridentTopology.build();
public static void main(String[] str) throws Exception
Config conf = new Config();
BrokerHosts hosts = new ZkHosts("xxxx:2181,xxxx:2181,xxxx:2181");
String topic = "event";
//String zkRoot = topologyConfig.getProperty("kafka.zkRoot");
String consumerGroupId = "StormSpout";
DRPCClient drpc = new DRPCClient("xxxx",3772);
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts, topic, consumerGroupId);
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new XScheme());
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);
StormSubmitter.submitTopology("event_trident", conf, buildTopology(opaqueTridentKafkaSpout));
【问题讨论】:
【参考方案1】:您必须配置 DRPC 服务器的位置并启动它们。 请参阅http://storm.apache.org/releases/0.10.0/Distributed-RPC.html 上的远程模式 DRPC
启动 DRPC 服务器 配置 DRPC 服务器的位置 向 Storm 集群提交 DRPC 拓扑 启动 DRPC 服务器可以使用 Storm 脚本完成,就像启动 Nimbus 或 UI:
bin/storm drpc
接下来,您需要配置 Storm 集群以了解 DRPC 服务器的位置。这就是 DRPCSpout 知道从哪里读取函数调用的方式。这可以通过storm.yaml 文件或拓扑配置来完成。通过storm.yaml配置它看起来像这样:
drpc.servers: - “drpc1.foo.com” - “drpc2.foo.com”
【讨论】:
我是新手,如何设置 drpc nstorm.yaml 的配置我的意思是这里的 foo 是什么意思?如果 drpc 有端口号怎么办?以上是关于风暴中的 DRPC 服务器错误的主要内容,如果未能解决你的问题,请参考以下文章