Storm调度
Posted 石头-Stone
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm调度相关的知识,希望对你有一定的参考价值。
一、storm计算框架
:性能优化的第一步就是找到瓶颈在哪里,从瓶颈处入手,解决关键点问题,事半功倍。
除了通过系统命令查看CPU使用,jstack查看堆栈的调用情况以外,还可以通过Storm自身提供的信息来对性能做出相应的判断。
在Storm 的UI中,对运行的topology提供了相应的统计信息
三个重要参数:
·Execute latency:消息(tuple)的平均处理时间,单位是毫秒。
·Process latency:消息从收到到被ack掉所花费的时间,单位为毫秒。如果没有启用Acker机制,那么Process latency的值为0。
·Capacity:计算公式为Capacity = Spout 或者 Bolt 调用 execute 方法处理的消息数量 × 消息平均执行时间/时间区间。如果这个值越接近1,说明Spout或者Bolt基本一直在调用 execute 方法,因此并行度不够,需要扩展这个组件的 Executor数量。
////////////////////////////////////////////////////////////////////////////////
一、
Storm可以很容易地在集群中横向拓展它的计算能力,它会把整个运算过程分割成多个独立的tasks在集群中进行并行计算。在Storm中,一个task就是运行在集群中的一个Spout或Bolt实例。
Topology的运行涉及到四种组件:
Node(machines):集群中的节点,就是这些节点一起工作来执行Topology。
Worker(JVMs):一个worker就是一个独立的JVM进程。每个节点都可以通过配置运行一个或多个worker,一个Topology可以指定由多少个worker来执行。
Executors(threads):一个worker JVM中运行的线程。一个worker进程可以执行一个或多个executor线程。一个Executor可以运行一个“组件”内的多个tasks,Storm默认一个每个executor分配一个task。
Task(bolt/spout实例):Tasks就是spouts和bolts的实例,它具体是被executor线程处理的。
二、
Storm实例:wordcount
Topology默认执行情况如下: 一个节点会为Topology分配一个worker,这个worker会为每个Task启一个executor。
2.1 为Topology增加worker
两种途径增加workers:通过程序设置或storm rebalance命令。
Config config = new Config();
config.setNumWorkers(2);
注意:在LocalMode下不管设置几个worker,最终都只有一个worker进程。
2.2 配置executors和tasks
task是spout和bolt的实例,一个executor线程处理多个task,task是真正处理具体数据的一个过程。Task的数量在整个topology运行期间一般是不变的,但是组件的Executor是有可能发生变化的,即有:thread<=task。
2.2.1 设置executor(thread)数量
每个组件产生多少个Executor?在程序中设置或storm rebalance命令
builder.setSpout(SENTENCE_SPOUT_ID,spout, 2);
2.2.2 设置task的数量
每个组件创建多少个task?在程序中设置或storm rebalance命令
builder.setBolt(SPLIT_BOLT_ID,splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID,newFields("word"));
如果一开始分配2个workers,则Topology的运行情况如下:
三、
一个实际topology的全景,topology由三个组件组成,
一个Spout:BlueSpout
两个Bolt:GreenBolt、YellowBolt。
如上图,我们配置了两个worker进程,两个BlueSpout线程,两个GreenBolt线程和六个YellowBolt线程,那么分布到集群中的话,每个工作进程都会有5个executor线程。具体代码:
Config conf = new Config(); conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout(“blue-spout”, new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt(“green-bolt”, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(“blue-spout”);
topologyBuilder.setBolt(“yellow-bolt”, new YellowBolt(), 6) .shuffleGrouping(“green-bolt”);
StormSubmitter.submitTopology( “mytopology”, conf, topologyBuilder.createTopology() );
Storm中也有一个参数来控制topology的并行数量: TOPOLOGY_MAX_TASK_PARALLELISM: 这个参数可以控制一个组件上Executor的最大数量。它通常用来在本地模式测试topology的最大线程数量。当然我们也可以在代码中设置:
config.setMaxTaskParallelism().
四、
如何改变一个运行topology中的Parallelism
Storm中一个很好的特性就是可以在topology运行期间动态调制worker进程或Executor线程的数量而不需要重启topology。这种机制被称作rebalancing。 我们有两种方式来均衡一个topology:
1:通过Storm web UI
2:通过storm rebalance命令
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
二、storm调度算法
DefaultScheduler:默认调度算法,采用轮询的方式将系统中的可用资源均匀地分配给topology,但也不是绝对均匀。会先将其它topology不需要的资源重新收集起来。EventScheduler:和DefaultScheduler差不多,不会先将其它topology不需要的资源重新收集起来。
IsolationScheduler:用户可定义topology的机器资源,storm分配的时候会优先分配这些机器,以保证分配给该topology的机器只为这一个topology服务。
DefaultScheduler:
1:调用cluster的needsSchedualerTopologies方法获得需要进行任务分配的topologies
开始分别对每一个topology进行处理
2:调用cluster的getAvailableSlots方法获得当前集群可用的资源,以<node,port>集合的形式返回,赋值给available-slots
3:获得当前topology的executor信息并转化为<start-t ask-id,end-task-id>集合存入all-executors,根据topology计算executors信息,采用compute-executors算法。
4:调用DefaultScheduler的get-alive-assigned-node+port->executors方法获得该topology已经获得的资源,返回<node+port,executor>集合的形式存入alive-assigned
5:调用slot-can-reassign对alive-assigned中的slots信息进行判断,选出其中能被重新分配的slot存入变量can-reassigned。这样可用的资源就由available-slots和can-reassigned两部分组成。
6:计算当前topology能使用的全部slot数目total-slots--to-use:min(topology的NumWorker数,available-slots+can-reassigned),如果total-slots--to-use>当前已分配的slots数目,则调用bad-slots方法计算可被释放的slot
7:调用cluster的freeSlots方法释放计算出来的bad-slot
8:最后调用EventScheduler的schedule-topologies-evenly进行分配
:先计算集群中可供分配的slot资源,并判断当前已分配给运行Topology的slot是否需要重新分配,然后对可分配的slot进行排序,再计算Topology的executor信息,最后将资源平均地分配给Topology。
接下来我们提交3个topology