使用 Kafka Spout 的 Kafka Storm 集成
Posted
技术标签:
【中文标题】使用 Kafka Spout 的 Kafka Storm 集成【英文标题】:Kafka Storm Integration using Kafka Spout 【发布时间】:2013-06-24 21:55:23 【问题描述】:我正在使用 KafkaSpout。请在下面找到测试程序。
我使用的是 Storm 0.8.1。 Storm 0.8.2 中有 Multischeme 类。我会用那个。我只想通过实例化 StringScheme() 类来了解早期版本是如何工作的?我在哪里可以下载早期版本的 Kafka Spout?但我怀疑这不是在 Storm 0.8.2 上工作的正确选择。 ??? (困惑)
当我在风暴集群上运行代码(如下所示)时(即,当我推送我的拓扑时)我收到以下错误(当 Scheme 部分被注释时会发生这种情况,当然我会收到编译器错误,因为类不是0.8.1):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
在下面给出的代码中,您可能会发现 spoutConfig.scheme=new StringScheme();部分评论。如果我不评论那条很自然的行,我会收到编译器错误,因为那里没有构造函数。此外,当我实例化 MultiScheme 时,我会收到错误,因为我在 0.8.1 中没有该类。
public class TestTopology
public static class PrinterBolt extends BaseBasicBolt
public void declareOutputFields(OutputFieldsDeclarer declarer)
public void execute(Tuple tuple, BasicOutputCollector collector)
System.out.println(tuple.toString());
public static void main(String [] args) throws Exception
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
【问题讨论】:
我想我不明白这个问题:如果你去 0.8.2 就可以工作吗?如果是这样,为什么还要尝试在 0.8.1 中运行:0.8.2 用一些错误修复和其他改进来取代它。 【参考方案1】:我遇到了同样的问题。终于解决了,我把完整的运行示例放到了github上。
欢迎您在这里查看> https://github.com/buildlackey/cep
(单击storm+kafka 目录可查看示例程序,该程序可以帮助您启动并运行)。
【讨论】:
考虑在你的答案中添加一两句话来描述你所做的事情,这样你的答案就相关了,而不依赖于 Git 存储库处于活动状态。 当然:该项目包含单元测试和示例程序,说明如何在 Storm、Kafka 和 Esper 之上开发复杂事件处理 (CEP) 应用程序。 @neontapir 的意思是添加一些信息,说明该代码如何解决问题,即,这样您无需查看示例代码即可了解解决方案。【参考方案2】:我们遇到了类似的问题。
我们的解决方案:
打开 pom.xml
将范围从提供更改为<scope>compile</scope>
如果您想了解有关依赖范围的更多信息,请查看 maven 文档: Maven docu - dependency scopes
【讨论】:
以上是关于使用 Kafka Spout 的 Kafka Storm 集成的主要内容,如果未能解决你的问题,请参考以下文章
使用 Kafka Spout 的 Apache Storm 给出错误:IllegalStateException
storm问题记录 python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理