Storm调度

Posted 石头-Stone

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm调度相关的知识,希望对你有一定的参考价值。

一、storm计算框架

 

性能优化的第一步就是找到瓶颈在哪里,从瓶颈处入手,解决关键点问题,事半功倍。

除了通过系统命令查看CPU使用,jstack查看堆栈的调用情况以外,还可以通过Storm自身提供的信息来对性能做出相应的判断。

Storm UI中,对运行的topology提供了相应的统计信息

三个重要参数:

·Execute latency:消息(tuple)的平均处理时间,单位是毫秒。

·Process latency:消息从收到到被ack掉所花费的时间,单位为毫秒。如果没有启用Acker机制,那么Process latency的值为0

·Capacity:计算公式为Capacity = Spout 或者 Bolt 调用 execute 方法处理的消息数量 × 消息平均执行时间/时间区间。如果这个值越接近1,说明Spout或者Bolt基本一直在调用 execute 方法,因此并行度不够,需要扩展这个组件的 Executor数量。

 

////////////////////////////////////////////////////////////////////////////////

一、

Storm可以很容易地在集群中横向拓展它的计算能力,它会把整个运算过程分割成多个独立的tasks在集群中进行并行计算。在Storm中,一个task就是运行在集群中的一个SpoutBolt实例。

Topology的运行涉及到四种组件:

Node(machines):集群中的节点,就是这些节点一起工作来执行Topology

Worker(JVMs):一个worker就是一个独立的JVM进程。每个节点都可以通过配置运行一个或多个worker,一个Topology可以指定由多少个worker来执行。

Executors(threads):一个worker JVM中运行的线程。一个worker进程可以执行一个或多个executor线程。一个Executor可以运行一个“组件”内的多个tasksStorm默认一个每个executor分配一个task

Task(bolt/spout实例)Tasks就是spoutsbolts的实例,它具体是被executor线程处理的。  

二、

Storm实例:wordcount
Topology默认执行情况如下: 一个节点会为Topology分配一个worker,这个worker会为每个Task启一个executor。                                                                                                  

2.1 Topology增加worker
两种途径增加workers:通过程序设置或storm rebalance命令。

Config config = new Config(); 

config.setNumWorkers(2);
注意:在LocalMode下不管设置几个worker,最终都只有一个worker进程。


2.2 配置executorstasks
taskspoutbolt的实例,一个executor线程处理多个tasktask是真正处理具体数据的一个过程。Task的数量在整个topology运行期间一般是不变的,但是组件的Executor是有可能发生变化的即有thread<=task


2.2.1 设置executorthread)数量
每个组件产生多少个Executor程序中设置或storm rebalance命令

builder.setSpout(SENTENCE_SPOUT_ID,spout, 2); 


2.2.2 设置task的数量
每个组件创建多少个task程序中设置或storm rebalance命令
builder.setBolt(SPLIT_BOLT_ID,splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);

builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID,newFields("word"));
如果一开始分配2workers,Topology的运行情况如下: 

 


三、

一个实际topology的全景,topology由三个组件组成,

一个SpoutBlueSpout

两个BoltGreenBoltYellowBolt 

 


如上图,我们配置了两个worker进程,两个BlueSpout线程,两个GreenBolt线程和六个YellowBolt线程,那么分布到集群中的话,每个工作进程都会有5executor线程。具体代码:

Config conf = new Config(); conf.setNumWorkers(2); // use two worker processes

topologyBuilder.setSpout(blue-spout, new BlueSpout(), 2); // set parallelism hint to 2

topologyBuilder.setBolt(green-bolt, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(blue-spout);

topologyBuilder.setBolt(yellow-bolt, new YellowBolt(), 6) .shuffleGrouping(green-bolt);

StormSubmitter.submitTopology( mytopology, conf, topologyBuilder.createTopology() );

 

Storm中也有一个参数来控制topology的并行数量: TOPOLOGY_MAX_TASK_PARALLELISM: 这个参数可以控制一个组件上Executor的最大数量。它通常用来在本地模式测试topology的最大线程数量。当然我们也可以在代码中设置:

config.setMaxTaskParallelism(). 

 

四、

如何改变一个运行topology中的Parallelism
Storm中一个很好的特性就是可以在topology运行期间动态调制worker进程或Executor线程的数量而不需要重启topology。这种机制被称作rebalancing。 我们有两种方式来均衡一个topology

1通过Storm web UI

2:通过storm rebalance命令

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 

 

二、storm调度算法

DefaultScheduler默认调度算法,采用轮询的方式将系统中的可用资源均匀地分配给topology也不是绝对均匀会先将其它topology不需要的资源重新收集起来EventScheduler:和DefaultScheduler差不多,会先将其它topology不需要的资源重新收集起来

IsolationScheduler:用户可定义topology的机器资源,storm分配的时候会优先分配这些机器,以保证分配给该topology的机器只为这一个topology服务

 

DefaultScheduler

1调用clusterneedsSchedualerTopologies方法获得需要进行任务分配的topologies

开始分别对每一个topology进行处理

2调用clustergetAvailableSlots方法获得当前集群可用的资源,以<node,port>集合的形式返回,赋值给available-slots

3获得当前topologyexecutor信息并转化为<start-t ask-id,end-task-id>集合存入all-executors,根据topology计算executors信息,采用compute-executors算法

4调用DefaultSchedulerget-alive-assigned-node+port->executors方法获得该topology已经获得的资源,返回<node+port,executor>集合的形式存入alive-assigned

5调用slot-can-reassignalive-assigned中的slots信息进行判断,选出其中能被重新分配的slot存入变量can-reassigned这样可用的资源就由available-slotscan-reassigned两部分组成

6计算当前topology能使用的全部slot数目total-slots--to-usemin(topologyNumWorker,available-slots+can-reassigned)如果total-slots--to-use>当前已分配的slots数目,则调用bad-slots方法计算可被释放的slot

7调用clusterfreeSlots方法释放计算出来的bad-slot

8最后调用EventSchedulerschedule-topologies-evenly进行分配

先计算集群中可供分配的slot资源,并判断当前已分配给运行Topologyslot是否需要重新分配,然后对可分配的slot进行排序计算Topologyexecutor信息,最后将资源平均地分配给Topology

 

 

接下来我们提交3topology