jstorm 核心
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jstorm 核心相关的知识,希望对你有一定的参考价值。
参考技术A生成Topology
IRichSpout
IRichSpout 为最简单的Spout接口
其中注意:
=>spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个=>task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
=>open是当task起来后执行的初始化动作
=>close是当task被shutdown后执行的动作
=>activate 是当task被激活时,触发的动作
=>deactivate 是task被deactive时,触发的动作
=>nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
=>ack, 当spout收到一条ack消息时,触发的动作,详情可以参考 ack机制
=>fail, 当spout收到一条fail消息时,触发的动作,详情可以参考 ack机制
=>declareOutputFields, 定义spout发送数据,每个字段的含义
=>getComponentConfiguration 获取本spout的component 配置
Bolt
其中注意:
=>bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
=>prepare是当task起来后执行的初始化动作
=>cleanup是当task被shutdown后执行的动作
=>execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考 ack机制 ** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考 ack机制
=>declareOutputFields, 定义bolt发送数据,每个字段的含义
=>getComponentConfiguration 获取本bolt的component 配置
打包
提交jar
xxxx.jar 为打包后的jar
com.alibaba.xxxx.xx 为入口类,即提交任务的类
parameter即为提交参数
Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树(因为一个tuple通过spout发出了,经过每一个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。
Acker跟踪算法的原理:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理,如果为0则认为已完全处理。
要实现ack机制:
阿里自己的Jstorm会提供
public interface IFailValueSpout void fail(Object msgId, List<object>values);
这样更合理一些, 可以直接取得系统cache的msg values
ack机制即,spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
在规定的时间内(默认是30秒),没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。
l或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
注意,我开始以为如果继承BaseBasicBolt那么程序抛出异常,也会让spout进行重发,但是我错了,程序直接异常停止了
这里我以分布式程序入门案例worldcount为例子吧。
有人问到Storm 是怎么处理重复的tuple?
因为Storm 要保证tuple 的可靠处理,当tuple 处理失败或者超时的时候,spout 会fail并重新发送该tuple,那么就会有tuple 重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。不过也有一些可行的策略:
(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后
续的批处理计算会更正实时计算的误差。
(2)使用第三方集中存储来过滤,比如利用 mysql 、MemCached 或者 Redis 根据逻辑主键来去重。
(3)使用bloom filter 做过滤,简单高效。
在学习storm的过程中,有不少人对storm的Spout组件中的ack及fail相关的问题存在困惑,这里做一个简要的概述。
Storm保证每一个数据都得到有效处理,这是如何保证的呢?正是ack及fail机制确保数据都得到处理的保证,但是storm只是提供给我们一个接口,而具体的方法得由我们自己来实现。例如在spout下一个拓扑节点的bolt上,我们定义某种情况下为数据处理失败,则调用fail,则我们可以在fail方法中进行数据重发,这样就保证了数据都得到了处理。其实,通过读storm的源码,里面有讲到,有些类(BaseBasicBolt?)是会自动调用ack和fail的,不需要我们程序员去ack和fail,但是其他Bolt就没有这种功能了。
jstorm之于storm
关于流处理框架,在先前的文章汇总已经介绍过Strom,今天学习的是来自阿里的的流处理框架JStorm。简单的概述Storm就是:JStorm 比Storm更稳定,更强大,更快,Storm上跑的程序,一行代码不变可以运行在JStorm上。直白的将JStorm是阿里巴巴的团队基于Storm的二次开发产物,相当于他们的Tengine是基于Ngix开发的一样。
阿里拥有自己的实时计算引擎
-
类似于hadoop 中的MR
-
开源storm响应太慢
-
开源社区的速度完全跟不上Ali的需求
-
降低未来运维成本
-
提供更多技术支持,加快内部业务响应速度
现有Storm无法满足一些需求
-
现有storm调度太简单粗暴,无法定制化
-
Storm 任务分配不平衡
-
RPC OOM一直没有解决
-
监控太简单
-
对ZK 访问频繁
JStorm相比Storm更稳定
-
Nimbus 实现HA:当一台nimbus挂了,自动热切到备份nimbus
-
原生Storm RPC:Zeromq 使用堆外内存,导致OS 内存不够,Netty 导致OOM;JStorm底层RPC 采用netty + disruptor保证发送速度和接受速度是匹配的
-
新上线的任务不会冲击老的任务:新调度从cpu,memory,disk,net 四个角度对任务进行分配,已经分配好的新任务,无需去抢占老任务的cpu,memory,disk和net
-
Supervisor主线
-
Spout/Bolt 的open/prepar
-
所有IO, 序列化,反序列化
-
减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描。
JStorm相比Storm调度更强大
-
彻底解决了storm 任务分配不均衡问题
-
从4个维度进行任务分配:CPU、Memory、Disk、Net
-
默认一个task,一个cpu slot。当task消耗更多的cpu时,可以申请更多cpu slot
-
默认一个task,一个memory slot。当task需要更多内存时,可以申请更多内存slot
-
默认task,不申请disk slot。当task 磁盘IO较重时,可以申请disk slot
-
可以强制某个component的task 运行在不同的节点上
-
可以强制topology运行在单独一个节点上
-
可以自定义任务分配,提前预约任务分配到哪台机器上,哪个端口,多少个cpu slot,多少内存,是否申请磁盘
-
可以预约上一次成功运行时的任务分配,上次task分配了什么资源,这次还是使用这些资源
JStorm相比Storm性能更好
JStorm 0.9.0 性能非常的好,使用netty时单worker 发送最大速度为11万QPS,使用zeromq时,最大速度为12万QPS。
-
JStorm 0.9.0 在使用Netty的情况下,比Storm 0.9.0 使用netty情况下,快10%, 并且JStorm netty是稳定的而Storm 的Netty是不稳定的
-
在使用ZeroMQ的情况下, JStorm 0.9.0 比Storm 0.9.0 快30%
性能提升的原因:
-
Zeromq 减少一次内存拷贝
-
增加反序列化线程
-
重写采样代码,大幅减少采样影响
-
优化ack代码
-
优化缓冲map性能
-
Java 比clojure更底层
JStorm的其他优化点
-
资源隔离。不同部门,使用不同的组名,每个组有自己的Quato;不同组的资源隔离;采用cgroups 硬隔离
-
Classloader。解决应用的类和Jstorm的类发生冲突,应用的类在自己的类空间中
-
Task 内部异步化。Worker 内部全流水线模式,Spout nextTuple和ack/fail运行在不同线程
原文:https://my.oschina.net/infiniteSpace/blog/308401
以上是关于jstorm 核心的主要内容,如果未能解决你的问题,请参考以下文章