jstorm集成kafka
Posted 我从二院来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jstorm集成kafka相关的知识,希望对你有一定的参考价值。
本人是spark的拥趸,因为工作中需要用到jstorm,作记录如下。
pom.xml
<dependencies> <dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>2.1.1</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-jdk14</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.6</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>utf8</encoding> </configuration> </plugin> </plugins> </build>
没什么好说的,无非都是常规的东西。需要注意的是做好kafka的offset的维护。
其它需要注意的两点,异常消息的处理和限流。
异常消息的处理其实就是ack/fail的问题。使用BaseBasicBolt的话,它会自动帮你实现ack与fail。但需要手动抛出FailException。这样的话,一旦出现异常,整个topology就退出集群了,这是不可接受的。
无奈只有使用IRichBolt,手动去捕获异常。这样如果异常不是数据结构的问题,只是下游比如获取其它数据连接问题比如邮件服务器问题,那么失败了不去更新offset,下次启动的时候还能继续消费。也无需去手动重发。
如果需要重发的话,storm只能自己实现,jstorm可以通过以下方式:
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
想要实现ack/fail必须满足以下三点:
1. 在spout emit tuple的时候,要加上第3个参数messageid
2. 在配置中acker数目至少为1
3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路
第1点,由于使用的是KafkaSpout,已经实现了。
第2点:
config.setNumAckers(1);
第3点:
collector.emit(input,new Values(map));
限流的问题,主要是考虑到可能会有的这么一种场景。如果jstorm集群意外退出,或者升级出现情况,导致长时间无法重启。而这时候kafka集群生产端消息源源不断在产生新的消息。当重启jstorm集群的时候,势必会导致消息大量涌入jstorm集群。
还有一种场景就是,消息量不稳定,时大时小,那么非常有必须设置这个参数进行限流。
那么这时候需要对消息进行限流。在spark streaming中可以对kafka每个分区每秒的消息数进行限制;考虑到如果直接写死一个值,在低谷期间会造成资源的浪费,可以通过资源实现情况来限流。
而jstorm则通过topology.max.spout.pending来设置。它表示jstorm集群中可能缓存也就是待消费的消息数。如果大于这个数,新的消息就不会进来。
以上是关于jstorm集成kafka的主要内容,如果未能解决你的问题,请参考以下文章
Storm编程之wordcount(kafka--》Jstorm--》redis)