storm学习笔记
Posted sheen1991
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm学习笔记相关的知识,希望对你有一定的参考价值。
Storm学习笔记
一、简介
本文使用的Storm版本为1.0.1
Storm是一个免费开源的分布式实时计算系统,它使得可靠地处理无限的数据流更加容易,可以实时的处理Hadoop的批量任务。Storm简单易用,且支持各种主流的程序语言。
Storm有很多适用场景:实时分析、在线机器学习、连续计算、分布式RPC、分布式ETL、易扩展、支持容错,可确保你的数据得到处理,易于构建和操控。
下图是Storm“流式数据处理”的概念图,即数据像水流一样从数据源头源源不断的流出,经过每个节点,每个节点根据自己的需求对数据进行实时的处理,并继续发送给下游。
二、Storm架构
Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,所以 也称之为Nimbus节点,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名为“Supervisor”的守护进程,也成为Supervisor节点,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都是快速失败(无论何时当遇到任何异常情况,将会执行自毁),而且是无状态的(所有的状态都保存在Zookeeper或者磁盘上),这样一来它们就变得十分健壮。在Storm的主控节点和工作节点之间,还有一层框架,Zookeeper集群。Zookeeper负责Nimbus节点和Supervisor节点之间的协调工作,用于管理集群中的不同组件。Supervisor节点上包含了很多Worker节点,具体的任务就是由Worker节点来执行的。
Storm的架构如下如所示:
在Storm集群的实际应用中,Zookeeper集群官方推荐的节点数至少为3个,对应于3台机器,如果需要的话再扩展。Zookeeper节点应是奇数个,因为Zookeeper集群也分为一个Leader节点和若干Follower节点,当Leader节点挂掉时,需要选举出新的Leader节点,奇数保证了选举的成功执行。Storm节点数则根据业务的吞吐量需求而定,一般需要一台机器作为Nimubus节点,其他一些作为Supervisor节点。Worker节点是一个逻辑概念,在一个Supervisor机器上可以有很多个Worker节点。关于Worker节点的概念下文有详细说明。
在Storm1.0.0版本之前,Storm集群中Nimbus节点是单点的,这也就意味着Nimbus节点挂了不会有备用节点补位。不过在Storm最初设计时就考虑了这点,Nimbus节点只负责监控Supervisor状态和任务分发,不负责具体的执行逻辑,所以Nimbus节点的负担非常之请,当Nimbus挂掉时不会影响各个worker节点的运行,重启后可以继续工作。但毕竟这样的设计降低了系统的容错性,比如当新任务分配时,或者某个supervisor挂掉的时候,没有Nimbus节点的支持,将无法对这些任务进行分发或者再分配。因此,Storm在1.0.0之后的版本提供了HA Nimbus机制,类似Zookeeper集群的Leader和Follower节点,当一个主控节点挂掉的时候,可以选举另外的节点为新的Nimbus节点。笔者查看了Apache Storm的官方网站,找到了关于这块内容的更新,下面是原文:
Experienced Storm users will recognize that the Storm Nimbus service is not a single point of failure in the strictest sense (i.e. loss of the Nimbus node will not affect running topologies). However, the loss of the Nimbus node does degrade functionality for deploying new topologies and reassigning work across a cluster.
In Storm 1.0 this “soft” point of failure has been eliminated by supporting an HA Nimbus. Multiple instances of the Nimbus service run in a cluster and perform leader election when a Nimbus node fails, and Nimbus hosts can join or leave the cluster at any time. HA Nimbus leverages the distributed cache API for replication to guarantee the availability of topology resources in the event of a Nimbus node failure.
链接:http://storm.apache.org/2016/04/12/storm100-released.html
三、基础概念
拓扑(Topology)
Storm 的拓扑是对实时计算应用逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还可以理解成由一系列通过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构。Spout 和 Bolt 称为拓扑的组件(Component)。
在Java中使用TopologyBuilder来构建拓扑。
数据流(Data Stream)
数据流(Streams)是 Storm 中最核心的抽象概念。一个数据流指的是在分布式环境中并行创建、处理的一组元组(tuple)的无界序列。数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。在默认情况下,元组(tuple)包含有整型(Integer)数字、长整型(Long)数字、短整型(Short)数字、字节(Byte)、双精度浮点数(Double)、单精度浮点数(Float)、布尔值以及字节数组等基本类型对象。当然,你也可以通过定义可序列化的对象来实现自定义的元组类型。
在声明数据流的时候需要给数据流定义一个有效的 id。不过,由于在实际应用中使用最多的还是单一数据流的 Spout 与 Bolt,这种场景下不需要使用 id 来区分数据流,因此可以直接使用 OutputFieldsDeclarer来定义“无 id”的数据流。实际上,系统默认会给这种数据流定义一个名为“default”的 id。
元组(Tupe)
Tuple是Storm中最小的数据传输单元,可以理解为一个值列表或者键值对,其键(有的地方也称之为“域名”或者“字段”,在Storm中用Field类代表)在Spout或者Bolt中通过declareOutputFields()方法定义,值在emit()方法中指定。具体参见后面的Spout/Bolt介绍。Tuple 中的值可以是任何类型的,动态类型的Tuple 的fields 可以不用声明;默认情况下,Storm 中的Tuple 支持私有类型、字符串、字节数组等作为它的字段值,如果使用其他类型,就需要序列化该类型。
Tuple 的字段默认类型有:integer、float、double、long、short、string、byte、binary(byte[])。典型的Tupe结构如下图所示:
数据源(Spout)
数据源(Spout)是拓扑中数据流的来源。一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中。根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。一个可靠的 Spout 能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。
Spout中的关键方法有ack、fail、open、nextTupe以及declareOutputFields等,下面一一介绍。
nextTupe:首先是最关键的方法nextTuple,是Spout中发送元组Tupe执行方法。顾名思义,nextTuple 要么会向拓扑中发送一个新的元组,要么会在没有可发送的元组时直接返回。方法中通过调用SpoutOutputCollector的emit方法将元组发射出去。需要注意的是,由于 Storm 是在同一个线程中调用所有的 Spout 方法,nextTuple 不能被 Spout 的任何其他功能方法所阻塞,否则会直接导致数据流的中断(关于这一点,阿里的 JStorm 修改了 Spout 的模型,使用不同的线程来处理消息的发送,这种做法有利有弊,好处在于可以更加灵活地实现 Spout,坏处在于系统的调度模型更加复杂,如何取舍还是要看具体的需求场景吧——译者注)。
**ack and fail:**Spout 中另外两个关键方法是 ack 和 fail,他们分别用于在 Storm 检测到一个发送过的元组已经被成功处理或处理失败后的进一步处理。注意,ack 和 fail 方法仅仅对上述“可靠的” Spout 有效。可靠的数据流保证了spout发送出去的元祖都能得到反馈,保证其被处理,但同时也增加了系统的开销,在设计过程中需要根据业务需求在可靠性和性能之间权衡。
可靠的spout
collector.emit(List tuple, Object messageId);
不可靠的spout
collector.emit(List tuple);
在使用了可靠的消息保证机制时,需要下游的bolt的execute方法中调用:
collector.ack(Tupe input);
以通知上游,该Tupe已经被当前节点处理完成。当Spout在超时时间内未等到ack确认或者fail确认,Storm将认为该节点处理失败,则调用spout的fail方法处理;相应地,如果处理成功,则调用Spout的ack方法。更详细的信息请参见下文或者链接“消息可靠性保证”
declareOutputFields:这是Spout中一个很有用的方法,用来定义不同的数据流和元组。在一个topology中,一个Spout可能发送很多数据消息,同时下游也可能有很多Bolt组件接收Spout发出的消息,但往往某个Bolt并不想接收Spout发来的所有数据,可能只需要接收某一类型数据流中的某些数据。Storm为我们提供给了这样的“订阅”机制,即Spout可以发送多种多样的数据流,而下游的Bolt可以根据自己的需求进行订阅,其实现的关键方法就是declareOutputFields。下面是一个典型实现:
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("institude", "institude2", "institude3"));
arg0.declareStream("testStreamId", new Fields("institude4","institude5"));
}
@Override
public void nextTuple() {
FraudTrans ft = (FraudTrans) input.getValue(0);
FraudTrans ft2 = new FraudTrans();
FraudTrans ft3 = new FraudTrans();
List anchors = new ArrayList();
collector.emit(new Values(ft,ft2,ft3));
collector.emit("testStreamId", new Values(ft2, ft3));
}
其中declareOutputFields方法中第一句是声明了一个包含三个FieldName的元组,则在nextTupe中第一个emit中发送了分别与声明的FieldName对应的数据值;在declareOutputFields的第二句中,定义了一个两个FieldName的元组,同时将其绑定了数据流ID为“testStreamId”,同时在nextTupe中发送该ID的数据流。这样,在后面的Bolt就可以选择订阅或者不订阅此streamId的数据流,并且根据自己的需要获取某个FieldName对应的值,而不是接收Spout发来的所有数据。
**open:**open方法是在打开Spout数据源的时Storm为我们调用的,相当于一个初始化方法,因此里面可以执行一些必要的初始化代码。但需要注意的是,在并发的Spout中,有多少个线程(executor),该方法就会被调用多少次。有关executor的概念在下文中将提及。因此在实际项目中,该处牵涉到的一些资源问题,需要慎重使用。
使用Spout时的注意事项
最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据;另外的一个或多个线程负责从数据源(如各种消息中间件、db等)读取数据并放入queue中。
如果不关心数据是否丢失(例如数据统计分析的典型场景),不要启用ack机制。
Spout的nextTuple和ack方法是在同一个线程中被执行的(可能最初觉得这块不会成为瓶颈,为了简单实现起见就单线程了,jstorm应该是已经改成了多线程),因此不能在nextTuple或ack方法里block住当前线程,这样将直接影响spout的处理速度,很关键。
Spout的nextTuple发送数据时,不能阻塞当前线程(见上一条),比如从queue中取数据时,使用poll接口而不是take,且poll方法尽量不要传参阻塞固定时间,如果queue中没有数据则直接返回;如果有多条待发送的数据,则一次调用nextTuple时遍历全部发出去。
Spout从0.8.1之后在调用nextTuple方法时,如果没有emit tuple,那么默认需要休眠1ms,这个具体的策略是可配置的,因此可以根据自己的具体场景,进行设置,以达到合理利用cpu资源。
对于一个指定的Spout Task,可能会有一些发出去的Tupe在等待处理,还没收到ack或者fail确认,则该Tupe将被pending挂起从而不影响后续Tupe的发送。通过Config.TOPOLOGY_MAX_SPOUT_PENDING配置可以设定最大允许pending的Tupe数量,比如设置成1000/5000或者其他合理的数值,从而防止Tupe挂起过多撑爆内存。建议在生产环境中设置此项,但值是多少需要根据系统吞吐量合理设置,因为太小可能会总成速度太慢,太大会占用内存。另外需要注意此项配置仅对可靠处理的Tupe有效。
注:declare和emit指定streamId的数据流后,允许不被订阅,一旦订阅,就要求订阅的streamId必须已经定义,都则在提交Topology任务时会报错InvalidTopologyException。
数据流处理组件(Bolt)
拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。
一个 Bolt 可以实现简单的数据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。例如,将一个微博数据流转换成一个趋势图像的数据流至少包含两个步骤:其中一个 Bolt 用于对每个图片的微博转发进行滚动计数,另一个或多个 Bolt 将数据流输出为“转发最多的图片”结果(相对于使用2个Bolt,如果使用3个 Bolt 你可以让这种转换具有更好的可扩展性)。
Bolt中的关键方法有execute、prepare、declareOutputFields和cleanup等。
declareOutputFields:与 Spout 相同,Bolt 也可以输出多个数据流。为了实现这个功能,可以先通过 OutputFieldsDeclarer 的 declareStream 方法来声明定义不同的数据流,然后在发送数据时在 OutputCollector 的 emit 方法中将数据流 id 作为参数来实现数据发送的功能。
在定义 Bolt 的输入数据流时,你需要从其他的 Storm 组件中订阅指定的数据流。如果你需要从其他所有的组件中订阅数据流,你就必须要在定义 Bolt 时分别注册每一个组件。
**execute:**Bolt 的关键方法是 execute 方法。execute 方法负责接收一个元组作为输入,并且使用 OutputCollector 对象发送新的元组。在接收时,可以通过Tupe.getValueByField()方法获取指定的元组,也可以根据元组List的下标接收或者全部接收。
如果有消息可靠性保障的需求,Bolt 必须为它所处理的每个元组调用 OutputCollector 的 ack 方法,以便 Storm 能够了解元组是否处理完成(并且最终决定是否可以响应最初的 Spout 输出元组树)。一般情况下,对于每个输入元组,在处理之后可以根据需要选择不发送还是发送多个新元组,然后再响应(ack)输入元组。IBasicBolt 接口能够实现元组的自动应答。
对于需要保证消息可靠性的topology,bolt也需要在emit数据的时候,将传入的Tupe作为anchor进行锚定,相关概念参见下文或者链接“消息可靠性保证”
prepare:此方法类似Spout中的open方法,在初始化bolt时调用。同样地,如果一个bolt有多个executor线程,则该方法将被执行多次。
cleanup:在bolt执行完毕后关闭时执行,可以释放一些资源等。需要注意的是,在本地模式(LocalCluster)中,该方法一定会执行,但是在集群模式下,Storm不保证该方法一定执行。
数据流分组(Stream Grouping)
为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。
在 Storm 中有八种内置的数据流分组方式(原文有误,现在已经已经有八种分组模型——译者注),而且你还可以通过CustomStreamGrouping 接口实现自定义的数据流分组模型。这八种分组分时分别为:
- 随机分组(Shuffle grouping):这种方式下元组会被尽可能随机地分配到 Bolt 的不同任务(tasks)中,使得每个任务所处理元组数量能够能够保持基本一致,以确保集群的负载均衡。
- 域分组(Fields grouping):这种方式下数据流根据定义的“域”来进行分组。例如,如果某个数据流是基于一个名为“user-id”的域进行分组的,那么所有包含相同的“user-id”的元组都会被分配到同一个任务中,这样就可以确保消息处理的一致性。
- 部分关键字分组(Partial Key grouping):这种方式与域分组很相似,根据定义的域来对数据流进行分组,不同的是,这种方式会考虑下游 Bolt 数据处理的均衡性问题,在输入数据源关键字不平衡时会有更好的性能1。感兴趣的读者可以参考这篇论文,其中详细解释了这种分组方式的工作原理以及它的优点。
- 完全分组(All grouping):这种方式下数据流会被同时发送到 Bolt 的所有任务中(也就是说同一个元组会被复制多份然后被所有的任务处理),使用这种分组方式要特别小心。
- 全局分组(Global grouping):这种方式下所有的数据流都会被发送到 Bolt 的同一个任务中,也就是 id 最小的那个任务。
- 非分组(None grouping):使用这种方式说明你不关心数据流如何分组。目前这种方式的结果与随机分组完全等效,不过未来 Storm 社区可能会考虑通过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行。
- 直接分组(Direct grouping):这是一种特殊的分组方式。使用这种方式意味着元组的发送者可以指定下游的哪个任务可以接收这个元组。只有在数据流被声明为直接数据流时才能够使用直接分组方式。使用直接数据流发送元组需要使用 OutputCollector 的其中一个 emitDirect 方法。Bolt 可以通过 TopologyContext 来获取它的下游消费者的任务 id,也可以通过跟踪 OutputCollector 的 emit 方法(该方法会返回它所发送元组的目标任务的 id)的数据来获取任务 id。
- 本地或随机分组(Local or shuffle grouping):如果在源组件的 worker 进程里目标 Bolt 有一个或更多的任务线程,元组会被随机分配到那些同进程的任务中。换句话说,这与随机分组的方式具有相似的效果。
具体某个bolt采用什么分组的方式,是在定义Topology的时候声明的,下面是一个典型实现:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("datasource", new DataSpout());
//随机分组
builder.setBolt("statistics", new StatisticsBolt()).shuffleGrouping(
"datasource");
//按照字段分组,保证了相同Field的消息由同一个task执行
builder.setBolt("institude_analysis", new InstitudeAnalysisBolt(), 2)
.fieldsGrouping("statistics",
new Fields("institude", "institude2"));
//仅订阅ID为streamId的数据流
builder.setBolt("institude_analysis2", new InstitudeAnalysisBolt2(), 2)
.fieldsGrouping("statistics", "testStreamId",
new Fields("institude4", "institude5"));
builder.setBolt("fraudSource_analysis", new FraudSourceAnalysisBolt())
.shuffleGrouping("institude_analysis");
Config conf = new Config();
conf.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("testA", conf, builder.createTopology());
Utils.sleep(120000);
cluster.killTopology("testA");
cluster.shutdown();
可靠性保证(Reliability)
Storm 可以通过拓扑来确保每个发送的元组都能得到正确处理或者说完整地处理。通过跟踪由 Spout 发出的每个元组构成的元组树可以确定元组是否已经完成处理。每个拓扑都有一个“消息延时”参数,如果 Storm 在延时时间内没有检测到元组是否处理完成,就会将该元组标记为处理失败,并会在稍后重新发送该元组。
为了充分利用 Storm 的可靠性机制,你必须在元组树创建新结点的时候以及元组处理完成的时候通知 Storm。这个过程可以在 Bolt 发送元组时通过 OutputCollector 实现:在 emit 方法中实现元组的锚定(Anchoring),同时使用 ack 方法表明你已经完成了元组的处理。
一个tuple被”完全处理”是什么意思?
一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples。例如,有这样一个 word-count 拓扑:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
这个拓扑从一个 Kestrel 队列中读取句子,这些句子在Spout中依托Tupe发送出去,并且在发送时启用消息可靠性保障机制,也就是说调用了emit(Tupe tupe,Objece msgId)这个API,这样这个被发送的Tupe就开始构建一棵Tupe树,且自己为根节点。然后在SplitSentenceBolt中,将句子分解成若干个单词,然后将它每个单词和该单词的数量作为Tupe发送出去,并且在发送时通过emit(Tupe tupe,List tupe)或者emit(List anchor , List Tupe)锚定。这种情况下,这些新的Tupe就会被锚定在接收到的Tupe上,也就是说将这个Tupe树做了延伸,构成了一颗新的Tupe树。
在Bolt中,如果使用了emit(List tupe)方法,即未进行锚定,其下游的Bolt仍然可以通过emit(Tupe anchor,List tupe)的方式来对接收到的Tupe进行锚定,生成一颗新的Tupe-Tree,并不是说在Spout中未指定msgId,在后续节点就不可以生成Tupe树了,只不过相应的ack/fail机制只能顺着树结构找到根节点定义的位置,无法继续向上游传递相关消息。
如果这棵 tuple 树发送完成,并且树中的每一条消息都得到了正确的处理,就表明发送 tuple 的 spout 已经得到了“完整性处理”。对应的,如果在指定的超时时间内 tuple 树中有消息没有完成处理就意味着这个 tuple 失败了。这个超时时间可以使用 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 参数在构造拓扑时进行配置,如果不配置,则默认时间为 30 秒。
在消息得到完整性处理后或者处理失败后会发生什么
为了理解这个问题,让我们先了解一下 tuple 的生命周期。下卖弄是Spout接口的定义。
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
首先,通过调用 Spout 的 nextTuple 方法,Storm 向 Spout 请求一个 tuple。Spout 会使用 open 方法中提供的SpoutOutputCollector 向它的一个输出数据流中发送一个 tuple。在发送 tuple 的时候,Spout 会提供一个 “消息 id”,这个 id 会在后续过程中用于识别 tuple。例如,上面的 KestrelSpout 就是从一个 kestrel 队列中读取一条消息,然后再发送一条带有“消息 id”的消息,这个 id 是由 Kestrel 提供的。使用 SpoutOutputCollector 发送消息一般是这样的形式:
collector.emit(new Values("field1", "field2", 3) , msgId);
随后,tuple 会被发送到对应的 bolt 中去,在这个过程中,Storm 会很小心地跟踪创建的消息树。如果 Storm 检测到某个 tuple 被完整处理, Storm 会根据 Spout 提供的“消息 id”调用最初发送 tuple 的 Spout 任务的 ack 方法。对应的,Storm 在检测到 tuple 超时之后就会调用 fail 方法。注意,对于一个特定的 tuple,响应(ack)和失败处理(fail)都只会由最初创建这个 tuple 的任务执行。也就是说,及时 Spout 在集群中有很多个任务,某个特定的 tuple 也只会由创建它的那个任务——而不是其他的任务——来处理成功或失败的结果。
我们再以 KestrlSpout 为例来看看在消息的可靠性处理中 Spout 做了什么。在 KestrlSpout 从 Kestrel 队列中取出一条消息时,可以看作它“打开”了这条消息。也就是说,这条消息实际上并没有从队列中真正地取出来,而是保持着一个“挂起”状态,等待消息处理完成的信号。在挂起状态的消息不回被发送到其他的消费者中。另外,如果消费者(客户端)断开了连接,所有处于挂起状态的消息都会重新放回到队列中。在消息“打开”的时候 Kestrel 会给客户端同时提供消息体数据和一个唯一的 id。KestrelSpout 在使用 SpoutOutputCollector 发送 tuple 的时候就会把这个唯一的 id 当作“消息 id”。一段时间之后,在 KestrelSpout 的 ack 或者 fail 方法被调用的时候,KestrelSpout 就会通过这个消息 id 向 Kestrel 请求将消息从队列中移除(对应 ack 的情况)或者将消息重新放回队列(对应 fail 的情况)。
使用 Storm 的可靠性机制的时候你需要注意两件事:首先,在 tuple 树中创建新节点连接时务必通知 Storm;其次,在每个 tuple 处理结束的时候也必须向 Storm 发出通知。通过这两个操作,Storm 就能够检测到 tuple 树会在何时完成处理,并适时地调用 ack 或者 fail 方法。Storm 的 API 提供了一种非常精确的方式来实现着两个操作。
你可以使用 OutputCollector 的 fail 方法来使得位于 tuple 树根节点的 Spout tuple 立即失败。例如,你的应用可以在建立数据库连接的时候抓取异常,并且在异常出现的时候立即让输入 tuple 失败。通过这种立即失败的方式,原始 Spout tuple 就会比等待 tuple 超时的方式响应更快。
每个待处理的 tuple 都必须显式地应答(ack)或者失效(fail)。因为 Storm 是使用内存来跟踪每个 tuple 的,所以,如果你不对每个 tuple 进行应答或者失效,那么负责跟踪的任务很快就会发生内存溢出。
此处仅讲述可靠性保障的基础概念,更多信息请参见并发编程网。
Storm并发(Worker|Executor|Task)
一个Topology可能会在多个Supervisor节点中拥有多个worker,一个worker可能包含多个Executor,一个Executor可能包含多个Task。Task是执行业务逻辑的最小任务逻辑实体。
工作进程(Workers)
worker运行在工作节点上(supervisor节点),是被Supervisor守护进程创建的用来干活的进程。拓扑是在一个或多个工作进程(worker processes)中运行的。每个工作进程都是一个实际的 JVM 进程,并且执行拓扑的一个子集。一个Worker里面不会运行属于不同的topology的执行任务。
线程(Executors)
Executor可以理解成一个Worker进程中的工作线程。一个Executor中只能运行隶属于同一个component(spout/bolt)的task。一个Worker进程中可以有一个或多个Executor线程。在默认情况下,一个Executor运行一个task。
任务(Tasks)
在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。每个任务都与一个执行线程相对应。数据流分组可以决定如何由一组任务向另一组任务发送元组。你可以在 TopologyBuilder 的 setSpout 方法和 setBolt 方法中设置 Spout/Bolt 的并行度。
一个Topology的并发度是指Executor的数量,下面展示了一个并发度为10的Topology图。
该Topology包含一个Spout和两个Bolts,且分配了两个Workers(上下两个矩形框)。其中Spout的并发度为2,也就是说该Spout分配了两个Executor,Green Bolt的并发度为2,Yellow Bolt的并发度为6,所以总共是10。BlueSpout包含2个Tasks,Green Bolt包含4个Tasks,Yellow Bolt包含6个Tasks。Task从属于Executor。Storm尽可能的在Worker之间进行均衡负载,所以此处每个Worker有5个Executors,即5个线程。
Storm建议在配置并行度时,Task的数量设置为Executor的倍数,Executor数量为worker的倍数,利于Storm负载均衡。
关于Worker、Executor和Task的配置如下文:
设置worker数量
Config配置中设置:TOPOLOGY_WORKERS
代码中设置:Config.setNumWorkers(Num_Of_Worker)
设置executor数量
TopologyBuilder.setSpout(arg0,arg1,Num_Of_Executor)
设置task数量
Configuration option: TOPOLOGY_TASKS
ComponentConfigurationDeclarer.setNumTasks(Num_OF_Task)
如果不指定,则一个Executor对应一个Task,这也是我们常用的。
Task数量>=Executor数量
一个典型的例子:
//2 workers 2 Executors 4Tasks
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout);
Config conf = new Config();
conf.setNumWorkers(2);
Topology工作流程
Storm集群工作流程概括来说主要包括以下几个步骤:
任务Topology提交到Storm集群的Nimbus节点——Zookeeper生成workerbeats存储Topology所有Worker的的心跳信息——Supervisor轮询Zookeeper集群,在Zookeeper的assignment中存储了Topology任务分配信息、代码存储目录、任务之间的关系等(次数介绍的较粗糙,待完善)。
Topology 中的流处理时,调用方法的过程如图3下图所示。
Topology 方法调用的过程有如下一些要点:
- 每个组件(Spout 或者Bolt)的构造方法和declareOutputFields 方法都只被调用一次。
- open 方法和prepare 方法被调用多次。在入口函数中设定的setSpout 或者setBolt 中的并行度参数是指Executor 的数量,是负责运行组件中的Task 的线程数量,此数量是多少,上述两个方法就会被调用多少次,在每个Executor 运行时调用一次。
- nextTuple 方法和execute 方法是一直运行的,nextTuple 方法不断发射Tuple,Bolt 的execute 不断接收Tuple 进行处理。只有这样不断地运行,才会产生无界的Tuple 流,体现实时性。这类似于Java 线程的run 方法。
- 提交一个Topology 之后,Storm 创建Spout/Bolt 实例并进行序列化。之后,将序列化的组件发送给所有任务所在的节点(即Supervisor 节点),在每一个任务上反序列化组件。
- Spout 和Bolt 之间、Bolt 和Bolt 之间的通信,通过ZeroMQ 的消息队列实现。
- 图3-16 没有列出ack 和fail 方法,在一个Tuple 成功处理之后,需要调用ack 方法来标记成功,否则调用fail 方法标记失败,重新处理该Tuple。
四、Storm集群部署
Storm分为本地模式和集群模式,本地模式比较简单,直接吧storm里的jar包加入到自己的工程classpath即可编码测试,无需任何额外配置。相较于集群模式,对编码的区别仅在于,定义Topology之后,不是提交到集群而是通过LocalCluster运行,代码如下:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("testA", conf, builder.createTopology());
Utils.sleep(120000);
cluster.killTopology("testA");
cluster.shutdown();
下面主要讲一下集群模式的部署安装。由于使用的是Storm最新的版本storm-1.0.1,安装过程和之前在网上看到的略有不同,主要参考的是Storm官网安装说明,详情参见:Setting up a Storm cluster。
注:笔者使用的是3台CentOS Linux主机。
部署Storm集群流程总结:
- 部署Zookeeper集群
- 在Nimbus节点和Supervisor节点机器上安装依赖环境
- 下载和解压Storm发布包
- 修改storm.yaml配置
- 启动Nimbus节点和Supervisor节点
- 提交topology任务,测试storm集群
部署Zookeeper集群
部署Zookeeper集群主要参考的是apache zookeeper官网上的部署流程,详情参见Clustered (Multi-Server) Setup。此处笔者部署了一个3个节点的Zookeeper集群,其实一台机器也完全可以,单节点的Zookeeper集群部署参见:Single Server and Developer Setup。不过根据Zookeeper安装文档,集群中机器节点数需要为奇数,以保证可以顺利的选出集群中的Leader节点。
step1 安装JDK1.7
最新的Storm要求JDK至少是1.7版本。安装JDK之后,记得配置环境变量。网上很多地方都有Linux下安装介绍,此处不再赘述。
step2 调整Java堆内存大小
此项配置影响Storm的性能,需设置合理的内存值,通常4G的内存机器推荐设置为3G左右。
step3 安装Zookeeper包
下载linux下Zookeeper安装包,解压。推荐奖zookeeper安装目录下bin目录添加到PATH,方便启停。每一台准备作为Zookeeper集群的机器都要执行此操作。
step4 配置Zookeeper
在Zookeeper安装目录/conf下新建zoo.cfg文件,添加下列配置:
tickTime=2000
dataDir=/var/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
通常需要改动的地方只有dataDir和zoo1~zoo3。
dataDir是Zookeeper需要存储一些信息的实际路径,根据自己的习惯和情况设置。建立好dataDir之后,还要去每台机器的该目录下创建文件myid,名字就叫“myid”,且里面只有一个阿拉伯数字。对于server.1=zoo1:2888:3888机器,则数字为1,对于server.2=zoo2:2888:3888为2。
zoo1~zoo3是Zookeeper集群中每台机器的IP地址。如果你不止3台,则需要继续添加。
每一台准备作为Zookeeper集群的机器都要执行此操作。
step5 启动Zookeeper集群
执行bin下的启动脚本
./zkServer.sh start-foreground
第一次启动建议执行上述指定,在前台启动,可以看到启动消息,确认没问题之后,可以省去-foreground,在后台启动,免得占用终端进程。
此操作每一台机器都要执行。因为Zookeeper集群在启动时要互相通信,而往往启动第一台时,与其他通信都无法建立,因为其他机器还没有注册监听,所以一开始会报出一些无法连接的异常,可以先忽略,为每台机子都执行启动脚本之后,便不会再报此异常。
顺利的话,几台机器依次启动起来,可以通过在每台机器上执行
./zkServer status
来查看当前机器节点的状态,并可看到当前节点是Follower节点还是Leader节点。
推荐阅读关于Zookeeper快速失败机制的文章:A few notes about Zookeeper deployment:
It’s critical that you run Zookeeper under supervision, since Zookeeper is fail-fast and will exit the process if it encounters any error case. See here for more details.
It’s critical that you set up a cron to compact Zookeeper’s data and transaction logs. The Zookeeper daemon does not do this on its own, and if you don’t set up a cron, Zookeeper will quickly run out of disk space. See here for more details.
至此,Zookeeper集群部署完毕,保持其状态为启动,继续后面的步骤。
安装Storm本地依赖
在所有准备作为Storm集群的机器上安装JDK1.7和Python2.6.6,具体的安装步骤自己去百度,不再赘述。
安装Storm
在所有Storm机器节点上下载并解压Storm安装包,推荐将bin目录添加到PATH方便启停Storm,此步骤简单,不在赘述。
修改配置文件
在所有机器节点的Storm安装目录/conf文件下,修改storm.yaml文件,进行配置。关键配置项解释:
storm.zookeeper.servers: Zookeeper集群主机列表。Storm根据此配置去轮询Zookeeper集群。此处将Zookeeper集群所有机器IP地址配上(注意-和“IP”之间有空格):
storm.zookeeper.servers:
- "111.222.333.444"
- "555.666.777.888"
- "aaa.bbb.ccc.ddd"
storm.local.dir: Strom的Nimbus节点和Supervisor节点在此存储一些状态信息,如jar包,配置等。此处需要有写权限。
storm.local.dir: "/mnt/storm"
nimbus.seeds: 告诉Storm那台机器是Nimbus节点。
nimbus.seeds: ["111.222.333.44"]
此处可配置多台机器,在中括号内用逗号隔开,不过笔者并未测试,根据官方文档的说明,此处可以配置个列表,从而解决Nimbus节点单点的“缺陷”。
原文:
You’re encouraged to fill out the value to list of machine’s FQDN. If you want to set up Nimbus H/A, you have to address all machines’ FQDN which run nimbus. You may want to leave it to default value when you just want to set up ‘pseudo-distributed’ cluster, but you’re still encouraged to fill out FQDN.
supervisor.slots.ports: 配置每台工作机器上有多少个worker节点,不同的worker在所给定的端口下监听等待分配的任务。
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
启动Storm集群
基本的配置到此结束,下面启动Storm集群来检验上述配置。
启动Nimbus节点: 在Nimbus节点机器上执行
bin/storm nimbus
启动Supervisor节点: 在每个Supervisor节点机器上执行
bin/storm supervisor
启动Storm UI: 在Nimbus节点上启动执行
bin/storm ui
之后在Nimbus节点机器的浏览器上输入localhost:8080,则可以看到当前Storm集群的一些状况。
提交Topology任务: 在Nimubus节点上,提交打包好的topology的jar包,执行
bin/storm jar xxxx.jar xxx.xxx.xxx.XxxTopology arg0 arg1 arg2
xxxx为jar包名称,XxxTopology为入口类, arg为参数,arg可为空。
提交之后,若没有问题,可在UI中看到当前topology的执行情况。
注:本文部分内容主要参考自并发编程网和Apache Storm官网,有些内容和图片直接从别的地方搬过来的,结合自己写的demo以及实际部署经验,经过自己的理解加工成文,不足之处请指正。