jstorm 核心

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jstorm 核心相关的知识,希望对你有一定的参考价值。

参考技术A

生成Topology

IRichSpout
IRichSpout 为最简单的Spout接口

其中注意:
=>spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个=>task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
=>open是当task起来后执行的初始化动作
=>close是当task被shutdown后执行的动作
=>activate 是当task被激活时,触发的动作
=>deactivate 是task被deactive时,触发的动作
=>nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
=>ack, 当spout收到一条ack消息时,触发的动作,详情可以参考 ack机制
=>fail, 当spout收到一条fail消息时,触发的动作,详情可以参考 ack机制
=>declareOutputFields, 定义spout发送数据,每个字段的含义
=>getComponentConfiguration 获取本spout的component 配置

Bolt

其中注意:
=>bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
=>prepare是当task起来后执行的初始化动作
=>cleanup是当task被shutdown后执行的动作
=>execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考 ack机制 ** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考 ack机制
=>declareOutputFields, 定义bolt发送数据,每个字段的含义
=>getComponentConfiguration 获取本bolt的component 配置

打包

提交jar
xxxx.jar 为打包后的jar
com.alibaba.xxxx.xx 为入口类,即提交任务的类
parameter即为提交参数

Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树(因为一个tuple通过spout发出了,经过每一个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。

Acker跟踪算法的原理:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理,如果为0则认为已完全处理。
要实现ack机制:

阿里自己的Jstorm会提供
public interface IFailValueSpout void fail(Object msgId, List<object>values);
这样更合理一些, 可以直接取得系统cache的msg values

ack机制即,spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理

在规定的时间内(默认是30秒),没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。
l或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
注意,我开始以为如果继承BaseBasicBolt那么程序抛出异常,也会让spout进行重发,但是我错了,程序直接异常停止了
这里我以分布式程序入门案例worldcount为例子吧。

有人问到Storm 是怎么处理重复的tuple?
因为Storm 要保证tuple 的可靠处理,当tuple 处理失败或者超时的时候,spout 会fail并重新发送该tuple,那么就会有tuple 重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。不过也有一些可行的策略:
(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后
续的批处理计算会更正实时计算的误差。
(2)使用第三方集中存储来过滤,比如利用 mysql 、MemCached 或者 Redis 根据逻辑主键来去重。
(3)使用bloom filter 做过滤,简单高效。

在学习storm的过程中,有不少人对storm的Spout组件中的ack及fail相关的问题存在困惑,这里做一个简要的概述。

Storm保证每一个数据都得到有效处理,这是如何保证的呢?正是ack及fail机制确保数据都得到处理的保证,但是storm只是提供给我们一个接口,而具体的方法得由我们自己来实现。例如在spout下一个拓扑节点的bolt上,我们定义某种情况下为数据处理失败,则调用fail,则我们可以在fail方法中进行数据重发,这样就保证了数据都得到了处理。其实,通过读storm的源码,里面有讲到,有些类(BaseBasicBolt?)是会自动调用ack和fail的,不需要我们程序员去ack和fail,但是其他Bolt就没有这种功能了。

jstorm之于storm

关于流处理框架,在先前的文章汇总已经介绍过Strom,今天学习的是来自阿里的的流处理框架JStorm。简单的概述Storm就是:JStorm 比Storm更稳定,更强大,更快,Storm上跑的程序,一行代码不变可以运行在JStorm上。直白的将JStorm是阿里巴巴的团队基于Storm的二次开发产物,相当于他们的Tengine是基于Ngix开发的一样。

技术分享

阿里拥有自己的实时计算引擎

  1. 类似于hadoop 中的MR

  2. 开源storm响应太慢

  3. 开源社区的速度完全跟不上Ali的需求

  4. 降低未来运维成本

  5. 提供更多技术支持,加快内部业务响应速度

现有Storm无法满足一些需求

  1. 现有storm调度太简单粗暴,无法定制化

  2. Storm 任务分配不平衡

  3. RPC OOM一直没有解决

  4. 监控太简单

  5. 对ZK 访问频繁

JStorm相比Storm更稳定

  1. Nimbus 实现HA:当一台nimbus挂了,自动热切到备份nimbus

  2. 原生Storm RPC:Zeromq 使用堆外内存,导致OS 内存不够,Netty 导致OOM;JStorm底层RPC 采用netty + disruptor保证发送速度和接受速度是匹配的

  3. 新上线的任务不会冲击老的任务:新调度从cpu,memory,disk,net 四个角度对任务进行分配,已经分配好的新任务,无需去抢占老任务的cpu,memory,disk和net

  4. Supervisor主线

  5. Spout/Bolt 的open/prepar

  6. 所有IO, 序列化,反序列化

  7. 减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描。

JStorm相比Storm调度更强大

  1. 彻底解决了storm 任务分配不均衡问题

  2. 从4个维度进行任务分配:CPU、Memory、Disk、Net

  3. 默认一个task,一个cpu slot。当task消耗更多的cpu时,可以申请更多cpu slot

  4. 默认一个task,一个memory slot。当task需要更多内存时,可以申请更多内存slot

  5. 默认task,不申请disk slot。当task 磁盘IO较重时,可以申请disk slot

  6. 可以强制某个component的task 运行在不同的节点上

  7. 可以强制topology运行在单独一个节点上

  8. 可以自定义任务分配,提前预约任务分配到哪台机器上,哪个端口,多少个cpu slot,多少内存,是否申请磁盘

  9. 可以预约上一次成功运行时的任务分配,上次task分配了什么资源,这次还是使用这些资源

JStorm相比Storm性能更好

JStorm 0.9.0 性能非常的好,使用netty时单worker 发送最大速度为11万QPS,使用zeromq时,最大速度为12万QPS。

  • JStorm 0.9.0 在使用Netty的情况下,比Storm 0.9.0 使用netty情况下,快10%, 并且JStorm netty是稳定的而Storm 的Netty是不稳定的

  • 在使用ZeroMQ的情况下, JStorm 0.9.0 比Storm 0.9.0 快30%

性能提升的原因:

  1. Zeromq 减少一次内存拷贝

  2. 增加反序列化线程

  3. 重写采样代码,大幅减少采样影响

  4. 优化ack代码

  5. 优化缓冲map性能

  6. Java 比clojure更底层

JStorm的其他优化点

  1. 资源隔离。不同部门,使用不同的组名,每个组有自己的Quato;不同组的资源隔离;采用cgroups 硬隔离

  2. Classloader。解决应用的类和Jstorm的类发生冲突,应用的类在自己的类空间中

  3. Task 内部异步化。Worker 内部全流水线模式,Spout nextTuple和ack/fail运行在不同线程

 

原文:https://my.oschina.net/infiniteSpace/blog/308401

以上是关于jstorm 核心的主要内容,如果未能解决你的问题,请参考以下文章

JStorm学习

JStorm介绍

JStorm:任务调度

jstorm之于storm

jstorm

Jstorm