首发Strom/JStorm:流式计算框架的应用
Posted 海数据实验室
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了首发Strom/JStorm:流式计算框架的应用相关的知识,希望对你有一定的参考价值。
【导读】
本内容总结自影合众科技有限公司资深Java工程师&架构师黄鑫在2017年7月26日海数据技术沙龙第34期上所分享的内容。
以下为分享内容:
大家好,我是黄鑫,今天很荣幸和大家一起来探讨一下Storm在实际当中的应用。
今天的分享内容分为如下几个部分:Storm介绍、Strom原理和架构、Topology并行度(如何做到窃听扩展以及高并发)、消息可靠性(对于流式计算消息可靠性Strom是如何保证的)、Trident Topology、业务场景(会以目前公司的业务实战举例),Reference以及Q&A。
1
Storm相关介绍
Strom是一个大家很熟悉的流式计算框架了,批量的流式计算我们一般指Hadoop、Spark,这些基本上是以一些大批量数据去做计算,由于目前商业应用上越来越高的要求,便产生了例如Strom、Flink以及阿里的JStrom等等。
以下我们来介绍一下Strom的原理以及架构,这里我将这些组件罗列出来了,分为逻辑概念(Topology、Spout、Bolt、Tuple、Stream grouping)以及Storm基础架构当中的一些其自己定义的组件(Nimbus、Zookeeper、Supervisor、Worker、Executor以及Task)。
这是一个关于Topology的计算模型图,在这个Strom中我们把所有的计算模型称之为一个Topology,spout为我们进入的数据流的源,比如Kafka或者致力于RPC的数据都是可以直接通过spout进来然后通过tuple的这种数据进入一个子节点(bolt),最后通过tuple输出到输出结果(bolt)并且落地,比如进入大数据的平台HDFS当中,或者进入到关系型数据库当中。
这个是Strom整个的框架,目前在1.0之后的Strom已经支持了nimbus的HA了,它的所有的master和supervisor之间的通信全部是通过zookeeper的方式。
Strom使用起来和Hadoop和相像,nimbus相当于Hadoop当中的ResourceManager,它负责所有资源的调配以及对supervisor的监控,supervisor相当于Hadoop当中的NodeManager,是计算进程的管理者,work相当于Hadoop中的Map以及Reduce。
他们的整个框架都是大同小异的。
2
Topology并行度
这是一个Topology的举例
这是一个很简单的WordCount的例子,有两个输入源(Sentence Spout和Sentence Spout1)输入后分为Split,然后根据每一个单词Count, Count之后Report出来。
图中为WordCount的代码,从代码中我们可以看出一种语言可以对接出很多种语言。
定义一个参数2,意味着可以共同执行两个线程,这就是所谓的并行度。
ShuffleGrouping指的是BOLT中关注的数据来自于哪里也就是上一个节点的数据源来自于哪里。
这个图中也是刚才所举的例子,他的并行度由刚才代码中最后一行中的NumWorkers体现(setNumWorkers(2)),两个Works相当于两个JVM,也就是两个进程(如图)。
两个Workers从属于Supervior也就是我们的Node,它会自己根据机器节点的这些资源执行相对比较LB的方式把需要计算的Worker部署到共同的Supervisor上去,slots为定义的节点当中的能够容纳计算的一个进程。
Worker就是一个JVM,相当于Topology。
下面与大家一起交流一下Topology的流程:
Topology对于Storm来说是分成两块来管理它整个集群以及他所以资源的,一部分数据使用Zookeeper的方式把所有的数据存到zookeeper里面去,还有一部分数据存储在了本地(Nimbus路径下以及Spervisor路径下)
在Zookeeper当中有一个根目录/storm,根目录下包括nimbuses/leader-lock以及supervisor,这三个是在启动nimbus和supervisor的时候创建的。
Nimbus本地路径下的blobs以及nimbus/inbox(临时文件上传至此),会把文件相应的copy到blobs中。
Supervisor接受到任务后,首先要从Nimbus中的blobs中把文件拷到—supervisor/stormdist当中并且尝试启动workers,JVM以及所有的进程信息会写到Supervisor的workers当中,workers-users是由用户登录/启动的。
这个图是刚才整个过程的一个体现。
部署了storm之后,我们来关注消息可靠性,基于流式计算的主要为At-Most-Once、At-Least-Once以及Exactly-Once。
Strom原生的支持是At-Least-Once,至少消费一次,如果失败了还可进行消费,但中途有状况的话不保证第二次消费的时候准确性。
Tuple Tree:从Setence Spout出来的第一个消息记为Tuple1,在Split Bolt中可能会分成四个单词那么可能会有4个Tuple,在Count阶段也有可能产生4个Tuple。
Spout支持ack()和fail(),Spout是一个语言,所有的计算节点处理完了之后(异或结果已经表明的前提下),后台调用一个进程,ack()方法返回给Spout消息,证明调用成功。
SpoutOutputCollector:发消息,通过emit()方法将消息发出。
OutputCollector:所有的tuple数据处理的方式。
Acker bolt:默认开启的Acker线程,通过他保持消息的可靠性,在TimeCacheMap这个数据结构当中存储buckets。
为了保证Strom有且只有一次的消费方式,提出了Trident Topology,是对于原生的Topology更强的抽象层,它可以支持状态的流数据处理,并且支持持久化以及事务性。
牵扯到事务都会涉及一个问题,如何保证性能,strom把所有的tuple信息使用batch机制实现他所有的高吞吐。
Topology最核心的两个概念就是Trident Spout和Trident State。
Trident Spout代表数据源,源端数据,确保在输入数据时是符合我们设计的。
Trident State 对于这种状态的处理,任何我们需要更改状态时,我们能够及时的去处理掉。
基于以上两点来保证我们的事务性。
在Spout中存在四个接口,其中需要注意的是IPartitionedTridentSpout和IPpauqePartitionedTridentSpout的区别,两者都保证的批次数据的且保证数据有且消费一次;但是前者保证的是batch数据重复消费时的数据是一模一样的,但是后者处理的这一次batch的数据和上一次batch的数据可能是有区别的。
图中列出了Spout和State的组合。
3
源码分析
把传进来的tuple数据做成批次然后进行处理,将数据emit到一个tuple当中。
_spout=spout;是我们要自己实现的一个TridentSpout
-emitter=spout.getEmitter(_txStateId,conf,context);便是spout中的一部分,emitter的作用便是把整个需要emit的操作全部封到类当中。
在此做了一个调用。
两者只有emit的方式不一样,一个是OpaqueEmitter一个是TransactionalEmitter。
asOpaqueEmitter():emitPartition方法以及Batch
asTransactionalEmitter():failFastNewPartitionBatch以及reEmitParititonBatch。
这两部分的实现区别在于Transaction直接调用第一个方法,失败了就是失败了;
如果失败了需要继续消费则使用第二个方法。
State分为begin,commit两个状态。
从图中大体看到实现方式
这一块属于Storm提供给我们的框架性的东西,可以用或不用,backing.multiGet(keys)是后端自己去实现的,multiGet记录了上一次的状态,他会根据上一次的状态对于这一批次的数据进行组合操作。
从此框架来看,数据不能是通过数据库进行update原生值的。
它会记录上一次的结果和本次的结果,所以他会出来例如上一次成功入到数据库中的值是4,这一批次batch之后的值是3,最后结果变为了7,那么再次batch之后的值为2,他最后的结果会是4+2=6而不是7了。
将值set到数据库中。
4
实际应用
我们在实际当中应用到storm的场景是我们有两千多个节点,数据库运用的是mysql,需求是我们要对两千多个节点的票房数据进行聚合操作。
之前的操作是全部运用Hadoop的方式去计算,但是结果会导致每一次的聚合计算就需要一个小时左右才能完成结果。
我们可能会出现一种现象,例如2016年的买票信息在2017年10月才发现某一张票缺失了,然后在2017年再去计算,这种情况下运用Hadoop只能重新算,浪费大量时间,并且不符合客户要求。
后来我们使用mysql数据库解析binlog格式把所有的数据解析出来,然后通过阿里开源的canal解析数据,将其解析成我们需要的值,然后传到Kafka,我们再通过Kafka将数据对接到Storm中进行整合计算,将数据对接到BI库中。
这是本次分享所涉及到的一些概念。
谢谢大家!
5
问答交流
Q1:Strom已经是一个很老的框架了,但是在Spark很多新框架的层出不迭中,Storm的使用依然很活跃是为什么?
A1:Strom从初代发表一直到现在更新到1.x的版本跨度很长,其次确实在于工程化来说例如公司战略方面等等的影响;具体说到为什么Storm到现在还能这么火,因为Strom对于实时性要求非常强的,对于业务性数据的计算,Strom还是最可靠的。
Q2:Strom和Spark相对起来,Strom学习周期较短学习成本较低;而Spark学习周期较长,成本较高;目前的情况还是会用Storm,但从长远来考虑,还是计划把业务从Storm迁移到Spark上面吗?
A2:技术选型的时候会进行各方面的权衡,作为技术人员我们都希望是技术越新,可靠性越高,但迫于产品各方面的压力,我们不得不做一些取舍,我们目前在调研JStorm,看它是不是能更好地满足我们目前的业务需求,因为我们更倾向与所有的逻辑都使用Java来编写,这样对我们来说能更好的进行控制。
Q3: 目前使用2000千多台MySQL节点,每天的处理的数据量大概有多少呀?
A3: 一家的数据量在100M到200M之间,合起来的话数据量在几百个G左右。
Q4:介绍一下目前贵公司使用的4台服务器的配置情况?
A4:每台机器的内存是128G,10个CPU,磁盘是24TB。
Q5:我们是全国移动事业技术单位,目前公司有1000多张表,一年的数据量在16PB左右,每天大概有20%的数据需要处理,使用Strom的这种架构还需要做其它的扩充吗?
A5:如果你们单位在数据存储方面没有什么问题的话,那么可以使用Storm的这套架构,但是呢,我的建议是把数据做一下拆分,使用多个Storm或者Spark平台来进行数据处理,这样效率会更高一些。
Q6:Storm的处理好数据之后,数据是怎么存放的以及与前端交互是怎么样实现的?
A6:我们在Storm处理完数据后,会把使用Kafka等相关组件对处理结果进行划分然后分步地把数据存入数据里面,前端可以直接从数据库里面取得相关数据。
【嘉宾介绍】
黄鑫(Jason)先生入行多年,专注于基于Java的分布式高并发后端开发、云平台PaaS层研发和大数据基础体系机构设计,为资深Java工程师&架构师。
曾工作于Oracle,现就职于影合众科技有限公司。
对JVM、大数据Hadoop体系以及storm流式计算框架有深入研究。
现负责影合众大数据平台的基于流式计算框架的实时BI体系和建模平台。
本文为海数据社区原创作品,未经授权不得转载,转载请与海数据小秘书联系
扫描“海数据学院”二维码获取更多资讯
以上是关于首发Strom/JStorm:流式计算框架的应用的主要内容,如果未能解决你的问题,请参考以下文章
Storm/JStorm之TopologyBuilder源码阅读