storm性能优化

Posted edgedance

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm性能优化相关的知识,希望对你有一定的参考价值。

Storm 性能优化

目录


  1. 场景假设
  2. 调优步骤和方法
  3. Storm 的部分特性
  4. Storm 并行度
  5. Storm 消息机制
  6. Storm UI 解析
  7. 性能优化

场景假设


在介绍 Storm 的性能调优方法之前,假设一个场景:
项目组部署了3台机器,计划运行且仅运行 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小规模实验集群,集群的配置情况如下表:

| 主机名 | 硬件配置 | 角色描述 |
|: ---------------- :|: ---------- -- |: ---------- :|
|hd01| 2CPUs, 4G RAM, 2TB 机械硬盘 | nimbus, supervisor, ui, kafka, zk |
|hd02 | 2CPUs, 4G RAM, 2TB 机械硬盘 | supervisor, kafka, zk |
|hd03 | 2CPUs, 4G RAM, 2TB 机械硬盘 | supervisor, kafka, zk |

现有一个任务,需要实时计算订单的各项汇总统计信息。订单数据通过 kafka 传输。在 Storm 中创建了一个 topology 来执行此项任务,并采用 Storm kafkaSpout 读取该 topic 的数据。kafka 和 Storm topology 的基本信息如下:

  • kafka topic partitions = 3
  • topology 的配置情况:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new kafkaSpout(), 3);
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout");
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order"));

Config conf = new Config();
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());

那么,在此假设下,Storm topology 的数据怎么分发?性能如何调优?这就是下文要讨论的内容,其中性能调优是最终目的,数据分发即 Storm 的消息机制,则是进行调优前的知识储备。

调优步骤和方法


Storm topology 的性能优化方法,整体来说,可依次划分为以下几个步骤:

  1. 硬件配置的优化
  2. 代码层面的优化
  3. topology 并行度的优化
  4. Storm 集群配置参数和 topology 运行参数的优化

其中第一点不是讨论的重点,无外乎增加机器的硬件资源,提高机器的硬件配置等,但是这一步却也不能忽略,因为机器配置太低,很可能后面的步骤怎么调优都无济于事。

Storm 的一些特性和原理,是进行调优的必要知识储备

Storm 的部分特性

目前 Storm 的最新版本为 2.0.0-SNAPSHOT。该版本太新,未经过大量验证和测试,因此本文的讨论都基于 2.0 以前的版本。Storm 有如下几个重要的特性:

  • DAG
  • 常驻内存,topology 启动后除非 kill 掉否则一直运行
  • 提供 DRPC 服务
  • Pacemaker(1.0以后的新特性)心跳守护进程,常驻内存,比ZooKeeper性能更好
  • 采用了 ZeroMQ 和 Netty 作为底层框架
  • 采用了 ACK/fail 方式的 tuple 追踪机制
    并且 ack/fail 只能由创建该tuple的task所承载的spout触发

了解这些机制对优化 Storm 的运行性能有一定帮助

Storm 并行度


Storm 是一个分布式的实时计算软件,各节点,各组件间的通信依赖于 zookeeper。从组件的角度看,Storm 运作机制构建在 nimbus, supervisor, woker, executor, task, spout/bolt 之上,如果再加上 topology,有时也可以称这些组件为 Concepts(概念)。详见官网介绍文章 http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html
http://storm.apache.org/releases/current/Tutorial.html

在介绍 Storm 并行度之前,先概括地了解下 Storm 的几个概念,此处假定读者有一定的 Storm 背景知识,至少曾经跑过一个 topology 实例。

nimbus 和 supervisor

nimbus 是 Storm 集群的管理和调度进程,一个集群启动一个 nimbus,主要用于管理 topology,执行rebalance,管理 supervisor,分发 task,监控集群的健康状况等。nimbus 依赖 zookeeper 来实现上述职责,nimbus 与 supervisor 等其他组件并没有直接的沟通。运行 nimbus 的节点成为主节点,运行 supervisor 的节点成为工作节点,nimbus 向 supervisor 分派任务,因此 Storm 集群也是一个 master/slave 集群。简单来说,nimbus 就是工头,supervisor 就是工人,nimbus 通过 zookeeper 来管理 supervisor。

supervisor 是一个工作进程,负责监听 nimbus 分派的任务。当它接到任务后,会启动一个 worker 进程,由 worker 运行 topology 的一个子集。为什么说是子集呢?因为当一个 topology 提交到集群后,nimbus 便会根据该 topology 的配置(此处假定 numWorker=3),将 topology 分配给3个 worker 并行执行(正常情况下是这样,也有不是均匀分配的,比如有一个 supervisor 节点内存不足了)。如果刚好集群有3个 supervisor,则每个 supervisor 会启动1个 worker,即一个节点启动一个 worker(一个节点只能有一个 supervisor 有效运行)。因此,worker 进程运行的是 topology 的一个子集。supervisor 同样通过 zookeeper 与 nimbus 进行交流,因此 nimbus 和 supervisor 都可以快速失败/停止,因为所有的状态信息都保存在本地文件系统的 zookeeper 中, 当失败停止运行后,只需要重新启动 nimbus 或 supervisor 进程以快速恢复。当然,如果集群中正在工作的 supervisor 停止了,其上运行着的 topology 子集也会跟着停止,不过一旦 supervisor 启动起来,topology 子集又立刻恢复正常了。

nimbus 和 supervisor 的协作关系
worker

worker 是一个JVM进程,由 supervisor 启动和关闭。当 supervisor 接到任务后,会根据 topology 的配置启动若干 worker,实际的任务执行便由 worker 进行。worker 进程会占用固定的可由配置进行修改的内存空间(默认768M)。通常使用 conf.setNumWorkers() 函数来指定一个 topolgoy 的 worker 数量。

executor

executor 是一个线程,由 worker 进程派生(spawned)。executor 线程负责根据配置派生 task 线程,默认一个 executor 创建一个 task,可通过 setNumTask() 函数指定每个 executor 的 task 数量。executor 将实例化后的 spout/bolt 传递给 task。

task

task 可以说是 topology 最终的实际的任务执行者,每个 task 承载一个 spout 或 bolt 的实例,并调用其中的 spout.nexTuple(),bolt.execute() 等方法,而 spout.nexTuple() 是数据的发射器,bolt.execute() 则是数据的接收方,业务逻辑的代码基本上都在这两个函数里面处理了,因此可以说 task 是最终搬砖的苦逼。

topology

topology 中文翻译为拓扑,类似于 hdfs 上的一个 mapreduce 任务。一个 topology 定义了运行一个 Storm 任务的所有必要元件,主要包括 spout 和 bolt,以及 spout 和 bolt 之间的流向关系。

topology 结构
并行度

什么是并行度?在 Storm 的设定里,并行度大体分为3个方面:

  1. 一个 topology 指定多少个 worker 进程并行运行;
  2. 一个 worker 进程指定多少个 executor 线程并行运行;
  3. 一个 executor 线程指定多少个 task 并行运行。

一般来说,并行度设置越高,topology 运行的效率就越高,但是也不能一股脑地给出一个很高的值,还得考虑给每个 worker 分配的内存的大小,还得平衡系统的硬件资源,以避免浪费。
Storm 集群可以运行一个或多个 topology,而每个 topology 包含一个或多个 worker 进程,每个 worer 进程可派生一个或多个 executor 线程,而每个 executor 线程则派生一个或多个 task,task 是实际的数据处理单元,也是 Storm 概念里最小的工作单元, spout 或 bolt 的实例便是由 task 承载。

worker executor task 的关系

为了更好地解释 worker、executor 和 task 之间的工作机制,我们用官网的一个简单 topology 示例来介绍。先看此 topology 的配置:

Config conf = new Config();
conf.setNumWorkers(2); // 为此 topology 配置两个 worker 进程

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // blue-spout 并行度=2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) // green-bolt 并行度=2
               .setNumTasks(4)  // 为此 green-bolt 配置 4 个 task
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)  // yellow-bolt 并行度=6
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
    "mytopology",
    conf,
    topologyBuilder.createTopology()
);

从上面的代码可以知道:

  • 这个 topology 装备了2个 worker 进程,也就是同样的工作会有 2 个进程并行进行,可以肯定地说,2个 worker 肯定比1个 worker 执行效率要高很多,但是并没有2倍的差距;
  • 配置了一个 blue-spout,并且为其指定了 2 个 executor,即并行度为2;
  • 配置了一个 green-bolt,并且为其指定了 2 个 executor,即并行度为2;
  • 配置了一个 yellow-bolt,并且为其指定了 6 个 executor,即并行度为6;

大家看官方给出的下图:

一个 topology 的结构图示

可以看出,这个图片完整无缺地还原了代码里设定的 topology 结构:

  • 图左最大的灰色方框,表示这个 topology;
  • topology 里面刚好有两个白色方框,表示2个 worker 进程;
  • 每个 worker 里面的灰色方框表示 executor 线程,可以看到2个 worker 方框里各有5个 executor,为什么呢?因为代码里面指定的 spout 并行度=2,green-bolt并行度=2,yellow-bolt并行度=6,加起来刚好是10,而配置的 worker 数量为2,那么自然地,这10个 executor 会均匀地分配到2个 worker 里面;
  • 每个 executor 里面的黄蓝绿(写着Task)的方框,就是最小的处理单元 task 了。大家仔细看绿色的 Task 方框,与其他 Task 不同的是,两个绿色方框同时出现在一个 executor 方框内。为什么会这样呢?大家回到上文看 topology 的定义代码,topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4),这里面的 setNumTasks(4) 表示为该 green-bolt 指定了4个 task,且 executor 的并行度为2,那么自然地,这4个 task 会均匀地分配到2个 executor 里面;
  • 图右的三个圆圈,依次是蓝色的 blue-spout,绿色的 green-bolt 和黄色的 yellow-bolt,并且用箭头指示了三个组件之间的关系。spout 是数据的产生元件,而 green-bolt 则是数据的中间接收节点,yellow-bolt 则是数据的最后接收节点。这也是 DAG 的体现,有向的(箭头不能往回走)无环图。

参考
http://storm.apache.org/releases/1.0.1/Understanding-the-parallelism-of-a-Storm-topology.html
http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/

一个 topology 的代码较完整例子
TopologyBuilder builder = new TopologyBuilder();

BrokerHosts hosts       = new ZkHosts(zkConns);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, clintId);
spoutConfig.scheme      = new SchemeAsMultiScheme(new StringScheme());

/** 指示 kafkaSpout 从 kafka topic 最后记录的 ofsset 开始读取数据 */
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafkaSpout", kafkaSpout, 3); // spout 并行度=3
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); // FilterBolt 并行度=3
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); // AlertBolt 并行度=3

Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(3); // 为此 topology 配置3个 worker 进程
conf.setMaxSpoutPending(10000);

try {
    StormSubmitter.submitTopologyWithProgressBar(topology, conf, builder.createTopology());
} catch (Exception e) {
    e.printStackTrace();
}

Storm 消息机制


Storm 主要提供了两种消息保证机制(Message Processing Guarantee)

  • 至少一次 At least once
  • 仅且一次 exactly once

其中 exactly once 是通过 Trident 方式实现的(exactly once through Trident)。两种模式的选择要视业务情况而定,有些场景要求精确的仅且一次消费,比如订单处理,决不能允许重复的处理订单,因为很可能会导致订单金额、交易手数等计算错误;有些场景允许一定的重复,比如页面点击统计,访客统计等。总之,不管何种模式,Storm 都能保证数据不会丢失,开发者需要关心的是,如何保证数据不会重复消费。

At least once 的消息处理机制,在运用时需要格外小心,Storm 采用 ack/fail 机制来追踪消息的流向,当一个消息(tuple)发送到下游时,如果超时未通知 spout,或者发送失败,Storm 默认会根据配置策略进行重发,可通过调节重发策略来尽量减少消息的重复发送。一个常见情况是,Storm 集群经常会超负载运行,导致下游的 bolt 未能及时 ack,从而导致 spout 不断的重发一个 tuple,进而导致消息大量的重复消费。

在与 Kafka 集成时,常用 Storm 提供的 kafkaSpout 作为 spout 消费 kafka 中的消息。Storm 提供的 kafkaSpout 默认有两种实现方式:至少一次消费的 core Storm spouts 和仅且一次消费的 Trident spouts :(We support both Trident and core Storm spouts)。

在 Storm 里面,消息的处理,通过两个组件进行:spout 和 bolt。其中 spout 负责产生数据,bolt 负责接收并处理数据,业务逻辑代码一般都写入 bolt 中。可以定义多个 bolt ,bolt 与 bolt 之间可以指定单向链接关系。通常的作法是,在 spout 里面读取诸如 kafka,mysql,redis,elasticsearch 等数据源的数据,并发射(emit)给下游的 bolt,定义多个 bolt,分别进行多个不同阶段的数据处理,比如第一个 bolt 负责过滤清洗数据,第二个 bolt 负责逻辑计算,并产生最终运算结果,写入 redis,mysql,hdfs 等目标源。

Storm 将消息封装在一个 Tuple 对象里,Tuple 对象经由 spout 产生后通过 emit() 方法发送给下游 bolt,下游的所有 bolt 也同样通过 emit() 方法将 tuple 传递下去。一个 tuple 可能是一行 mysql 记录,也可能是一行文件内容,具体视 spout 如何读入数据源,并如何发射给下游。

如下图,是一个 spout/bolt 的执行过程:


spout/bolt 的执行过程

spout -> open(pending状态) -> nextTuple -> emit -> bolt -> execute -> ack(spout) / fail(spout) -> message-provider 将该消息移除队列(complete) / 将消息重新压回队列

ACK/Fail

上文说到,Storm 保证了数据不会丢失,ack/fail 机制便是实现此机制的法宝。Storm 在内部构建了一个 tuple tree 来表示每一个 tuple 的流向,当一个 tuple 被 spout 发射给下游 bolt 时,默认会带上一个 messageId,可以由代码指定但默认是自动生成的,当下游的 bolt 成功处理 tuple 后,会通过 acker 进程通知 spout 调用 ack 方法,当处理超时或处理失败,则会调用 fail 方法。当 fail 方法被调用,消息可能被重发,具体取决于重发策略的配置,和所使用的 spout。

对于一个消息,Storm 提出了『完全处理』的概念。即一个消息是否被完全处理,取决于这个消息是否被 tuple tree 里的每一个 bolt 完全处理,当 tuple tree 中的所有 bolt 都完全处理了这条消息后,才会通知 acker 进程并调用该消息的原始发射 spout 的 ack 方法,否则会调用 fail 方法。

ack/fail 只能由创建该 tuple 的 task 所承载的 spout 触发

默认情况下,Storm 会在每个 worker 进程里面启动1个 acker 线程,以为 spout/bolt 提供 ack/fail 服务,该线程通常不太耗费资源,因此也无须配置过多,大多数情况下1个就足够了。


ack/fail 示意
Worker 间通信

上文所说是在一个 worker 内的情况,但是 Storm 是一个分布式的并行计算框架,而实现并行的一个关键方式,便是一个 topology 可以由多个 worker 进程分布在多个 supervisor 节点并行地执行。那么,多个 worker 之间必然是会有通信机制的。nimbus 和 supervsor 之间仅靠 zookeeper 进行沟通,那么为何 worker 之间不通过 zookeeper 之类的中间件进行沟通呢?其中的一个原因我想,应该是组件隔离的原则。worker 是 supervisor 管理下的一个进程,那么 worker 如果也采用 zookeeper 进行沟通,那么就有一种越级操作的嫌疑了。

Worker 间通信

大家看上图,一个 worker 进程装配了如下几个元件:

  • 一个 receive 线程,该线程维护了一个 ArrayList,负责接收其他 worker 的 sent 线程发送过来的数据,并将数据存储到 ArrayList 中。数据首先存入 receive 线程的一个缓冲区,可通过 topology.receiver.buffer.size (此项配置在 Storm 1.0 版本以后被删除了)来配置该缓冲区存储消息的最大数量,默认为8(个数,并且得是2的倍数),然后才被推送到 ArrayList 中。receive 线程接收数据,是通过监听 TCP的端口,该端口有 storm 配置文件中 supervisor.slots.prots 来配置,比如 6700;
  • 一个 sent 线程,该线程维护了一个消息队列,负责将队里中的消息发送给其他 worker 的 receive 线程。同样具有缓冲区,可通过 topology.transfer.buffer.size 来配置缓冲区存储消息的最大数量,默认为1024(个数,并且得是2的倍数)。当消息达到此阈值时,便会被发送到 receive 线程中。sent 线程发送数据,是通过一个随机分配的TCP端口来进行的。
  • 一个或多个 executor 线程。executor 内部同样拥有一个 receive buffer 和一个 sent buffer,其中 receive buffer 接收来自 receive 线程的的数据,sent buffer 向 sent 线程发送数据;而 task 线程则介于 receive buffer 和 sent buffer 之间。receive buffer 的大小可通过 Conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE 参数配置,sent buffer 的大小可通过 Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE 配置,两个参数默认都是 1024(个数,并且得是2的倍数)。
Config conf = new Config();
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             16); // 默认8
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,      16384);

参考
http://storm.apache.org/releases/1.0.1/Guaranteeing-message-processing.html
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

Storm UI 解析


首页
  • Cluster Summary


    Cluster Summary

| 参数名 | 说明 |
|:----- :|:---- |
| Supervisors | 集群中配置的 supervisor 数量 |
| Used slots | 集群中已用掉的 workers 数量 |
| Free slots | 集群中空闲的 workers 数量 |
| Total slots | 集群中总的的 workers 数量 |
| Executors | 当前集群中总的 Executor 线程数量,该值等于集群中所有 topology 的所有 spout/bolt 配置的 executor 数量之和,其中默认情况下每个 worker 进程还会派生一个 acker executor 线程,也一并计算在内了 |
| Tasks | 当前集群中总的 task 数量,也是所有 executor 派生的 task 数量之和 |

  • Nimbus Summary
    比较简单,就略过了

  • Topology Summary


    Topology Summary

这部分也比较简单,值得注意的是 Assigned Mem (MB),这里值得是分配给该 topolgoy 下所有 worker 工作内存之和,单个 worker 的内存配置可由 Config.WORKER_HEAP_MEMORY_MB 和 Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB 指定,默认为 768M,另外再加上默认 64M 的 logwritter 进程内存空间,则有 832M。
此处 fast-pay 的值为 2496M = 3*832

  • Supervisor Summary


    Supervisor Summary

此处也比较简单,值得注意的是 slot 和 used slot 分别表示当前节点总的可用 worker 数,及已用掉的 worker 数。

  • Nimbus Configuration


    Nimbus Configuration

可搜索和查看当前 topology 的各项配置参数

topology 页面
  • Topology summary


    Topology summary

此处的大部分配置与上文中出现的意义一样,值得注意的是
Num executors 和 Num tasks 的值。其中 Num executors 的数量等于当前 topology 下所有 spout/bolt 的并行度总和,再加上所有 worker 下的 acker executor 线程总数(默认情况下一个 worker 派生一个 acker executor)。

  • Topology actions


    Topology actions

| 按钮 | 说明 |
|:----- :|:---- |
| Activate | 激活此 topology |
| Deactivate | 暂停此 topology 运行 |
| Rebalance | 调整并行度并重新平衡资源 |
| Kill | 关闭并删除此 topology |
| Debug | 调试此 topology 运行,需要设置 topology.eventlogger.executors 数量 > 0 |
| Stop Debug | 停止调试 |
| Change Log Level | 调整日志级别 |

  • Topology stats


    Topology stats

| 参数 | 说明 |
|:----- :|:---- |
| Window | 时间窗口,比如"10m 0s"表示在topology启动后10m 0s之内 |
| Emitted | 此时间窗口内发射的总tuple数 |
| Transferred | 此时间窗口内成功转移到下一个bolt的tuple数 |
| Complete latency (ms) | 此时间窗口内每个tuple在tuple tree中完全处理所花费的平均时间 |
| Acked | 此时间窗口内成功处理的tuple数 |
| Failed | 此时间窗口内处理失败或超时的tuple数 |

  • Spouts (All time)


    Spouts (All time)

| 参数 | 说明 |
|:----- :|:---- |
| Id | topologoy 中 spout 的名称,一般是在代码里设置的 |
| Executors | 当前分配给此 spout 的 executor 线程总数 |
| Tasks | 当前分配给此 spout 的 task 线程总数 |
| Emitted | 截止当前发射的总tuple数 |
| Transferred | 截止当前成功转移到下一个bolt的tuple数 |
| Complete latency (ms) | 截止当前每个tuple在tuple tree中完全处理所花费的平均时间 |
| Acked | 截止当前成功处理的tuple数 |
| Failed | 截止当前处理失败或超时的tuple数 |

  • Bolts (All time)


    Bolts (All time)

| 参数 | 说明 |
|:----- :|:---- |
| Id | topologoy 中 bolt 的名称,一般是在代码里设置的 |
| Executors | 当前分配给此 bolt 的 executor 线程总数 |
| Tasks | 当前分配给此 bolt 的 task 线程总数 |
| Emitted | 截止当前发射的总tuple数 |
| Transferred | 截止当前成功转移到下一个bolt的tuple数 |
| Complete latency (ms) | 截止当前每个tuple在tuple tree中完全处理所花费的平均时间 |
| Capacity (last 10m) | 性能指标,取值越小越好,当接近1的时候,说明负载很严重,需要增加并行度,正常是在 0.0x 到 0.1 0.2 左右。该值计算方式为 (number executed * average execute latency) / measurement time |
| Execute latency (ms) | 截止当前成功处理的tuple数 |
| Executed | 截止当前处理过的tuple数 |
| Process latency (ms) | 截止当前单个 tuple 的平均处理时间,越小越好,正常也是 0.0x 级别;如果很大,可以考虑增加并行度,但主要以 Capacity 为准 |
| Acked | 截止当前成功处理的tuple数 |
| Failed | 截止当前处理失败或超时的tuple数 |

spout 页面

这个页面,大部分都比较简单,就不一一说明了,值得注意的是下面这个 Tab:

  • Executors (All time)


    Executors (All time)

这个Tab的参数,应该不用解释了,但是要注意看,Emitted,Transferred 和 Acked 这几个参数,看看是否所有的 executor 都均匀地分担了 tuple 的处理工作。

bolt 页面

这个页面与 spout 页面类似,也不赘述了。

参考:这个页面通过 API 的方式,对 UI 界面的参数做了一些解释
http://storm.apache.org/releases/1.0.1/STORM-UI-REST-API.html

Storm debug


Storm 提供了良好的 debug 措施,许多操作可以再 UI 上完成,也可以在命令行完成。比如 Change log level 在不重启 topology 的情况下动态修改日志记录的级别,在 UI 界面上查看某个 bolt 的日志等,当然也可以在命令行上操作。

# 几个与 debug 相关的命令
bin/storm set_log_level [topology name]-l [logger name]=[LEVEL]:[TIMEOUT]
bin/storm logviewer &

下面的参考文章写的很详细,大家有兴趣可以去阅读一下,本文就不再讨论了。

参考
https://community.hortonworks.com/articles/36151/debugging-an-apache-storm-topology.html

性能调优


上文说了这么多,这才进入主题。

1、合理地配置硬件资源

此处暂不讨论

2、优化代码的执行性能

要优化代码的性能,如果严谨一点,首先要有一个衡量代码执行效率的方式。在数学上,通常使用大O函数来衡量一个算法的时间复杂度。我们可以考虑使用大O函数来近似地估计一个代码片段的执行时间:假定一行代码花费1个单位时间,那么代码片段的时间复杂度可以近似地用大O表示为 O(n),其中n表示代码的行数或执行次数。当然,如果代码里引入了其他的类和函数,或者处于循环体内,那么其他类、函数的代码行数,以及循环体内代码的重复执行次数也需要统计在内。这里说到大O函数的概念,在实际中也很少用到,我们往往会用第三方工具来较为准确地计算代码的实际执行时间,但是理解这个概念有助于优化我们的代码。有兴趣的同学可以阅读《算法概论》这本书。

这里顺便举一个斐波那契数列的例子:

/** C代码:用递归实现的斐波那契数列 */
int fibonacci(unsigned int n)
{
    if (n <= 0) return 0;
    if (n == 1) return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
/** C代码:用循环实现的斐波那契数列 */
int fibonacci(unsigned int n)
{
    int r, a, b;
    int i;
    int result[2] = {0, 1};

    if (n < 2) return result[n];

    a = 0;
    b = 1;
    r = 0;
    for (i = 2; i <= n; i++)
    {
        r = a + b;
        a = b;
        b = r;
    }
    return r;
}

观看两个不同实现的例子,第一种递归的方式,当传入的n很大时,代码执行的时间将会呈指数级增长,这时T(n)接近于 O(2^n);第二种循环的方式,即使传入很大的n,代码也可以在较短的时间内执行完毕,这时T(n)接近于O(n),为什么是O(n)呢,比如说n=1000,那么整个算法的执行时间基本集中在那个 for 循环里了,相当于执行了 for 循环内3行代码1000多次,所以差不多是n。这其实就是一种用空间换时间的概念,利用循环代替递归的方式,从而大大地优化了代码的执行效率。

回到我们的 Storm。代码优化,归结起来,应该有这几种:

  • 在算法层面进行优化
  • 在业务逻辑层面进行优化
  • 在技术层面进行优化
  • 特定于 Storm,合理地规划 topology,即安排多少个 bolt,每个 bolt 做什么,链接关系如何

在技术层面进行优化,手法就非常多了,比如连接数据库时,运用连接池,常用的连接池有 alibaba 的 druid,还有 redis 的连接池;比如合理地使用多线程,合理地优化JVM参数等等。这里举一个工作中可能会遇到的例子来介绍一下:

在配置了多个并行度的 bolt 中,存取 redis 数据时,如果不使用 redis 线程池,那么很可能会遇到 topology 运行缓慢,spout 不断重发,甚至直接挂掉的情况。首先 redis 的单个实例并不是线程安全的,其次在不使用 redis-pool 的情况下,每次读取 redis 都创建一个 redis 连接,同创建一个 mysql 连接一样,在连接 redis 时所耗费的时间相较于 get/set 本身是非常巨大的。

/**
 * redis-cli 操作工具类

以上是关于storm性能优化的主要内容,如果未能解决你的问题,请参考以下文章

storm知识点

storm知识点

spark实时计算性能优化

Spark性能优化案例

优化片段着色器

storm trident 的介绍与使用