实时流式计算——Storm

Posted 大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时流式计算——Storm相关的知识,希望对你有一定的参考价值。

实时流式计算——Storm(一)大 数 据

专注于前沿大数据案例资讯实时流式计算——Storm(一)

  1. Storm 简介    

   Storm 是由专业数据分析公司 BackType 开发的一个 分布式 实时数据处理软件,可以简单、高效、可靠地处理大量的数据流。Twitter 在 2011 年 7 月收购该公司,并于 2011 年 9 月底正式将 Storm 项目开源。Storm 被托管在 GitHub 上,目前最新版本是 0.9.0.1。软件核心部分使用 Clojure 开发,外围部分使用 Java 开发。Clojure(发音同 closure)是 Lisp 语言的一种现代方言。类似于 Lisp,Clojure 支持一种功能性编程风格,但 Clojure 还引入了一些特

性来简化多线程编程(一种对创建 Storm 很有用的特性) 。

    Storm 三大应用举例:

     * 信息处理(Stream Processing)

        Storm可以用来实时处理新数据和更新数据库

     * 连续计算(Continuous Computation)

        Storm可进行连续查询并把结果即时反馈给客户端。比如把Twitter热门话题发送到浏览器

     *分布式远程程序调用(Distributed RPC)

        Storm 可用来并行处理密集查询。Storm 的拓扑结构是一个等待调用信息分布函数,        当它收到一条信息后,会对查询进行计算,并返回查询结果。举个例子 Distributed        RPC 可以做并行搜索或者处理大集合的数据。

       Storm 可以方便在一个计算机集群中编写与可扩展的实时计算,Storm之于实时处理,好比hadoop的批处理。Storm保证每个消息都会得到处理,而且它很快。

       当然 Storm 也存在一些 缺点:开源版的 Storm 有个最大的缺点,就是只支持单 Nimbus节点,一旦 Nimbus 节点挂掉就只能重启,存在单点失效的问题;Clojure 是一个在 JVM 上运行的动态函数式编程语言,优势在于流程计算,Storm 的核心部分由 Clojure 编写,虽然性能上提高不少但同时也提高了维护成本。

1.1 主要特点

    Storm拥有低延时、高扩展、高容错、高性能、分布式等特性,可以保证消息不丢失,消息处理严格有序。Storm的主要特点如下:

    *拥有简单的编程模型 类似于MapReduce降低了并行批处理的复杂性,Storm降低了进行实时处理的复杂度

    *可以使用各种编程语言     你可以在 Storm之上使用各种编程语言。 默认支持Clojure、    Java、Ruby 和 Python。要增加对其他语言的支持,只需实现一个简单的 Storm 通    信协议即可。

    *容错性 Storm会管理工作进程和节点故障。

    *水平扩展 计算是在多个线程、进程和服务器之间并行进行的。

    *可靠的消息处理 Storm保障每个消息至少得到一次完整的处理。在任务失败时,会从消息源重新重试消息

    *快速   系统的设计保证了消息能得到快速的处理, 使用 ØMQ 作为其底层消息队列。

    *本地模式  Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这可以让开发者快速进行开发和单元测试。

    Storm集群是有一个主节点和多个工作节点组成。主节点运行一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。工作节点运行一个名为“Supervisor”的进程,用于监控工作开始并终止工作进程。“Nimbus”和“Supervisor”都能快速失败(失败之后能快速重启并会像什么事情都没发生过一样),而且无状态(状态信息都保存在zookeeper),这样一来他们就变得十分健壮,两者的协调工作都是由Apache ZooKeeper来完成。


实时流式计算——Storm(一)

第一步:client 提交拓扑到主节点

第二个:Nimbus 针对该拓扑建立本地的目录根据topology 的配置计算 task,分配 task,并在zookeeper上新建assignments存储task和supervisor机器节点中worker

第三个:在 zookeeper 上创建 taskbeats 节点来监控 task 的心跳,启动 topology。

第四步:Supervisor 去 zookeeper 上获取分配的 tasks,启动多个 woker 进行,每个 woker 生成 task,一个 task 一个线程;根据 topology 信息初始化建立task之间的连接;Task 和 Task 之间是通过 ZeroMQ 管理的;后整个拓扑运行起来。

          Storm 的术语包括 Stream、Spout、Bolt、Task、Worker、Stream Grouping 和 opology。Stream 是被处理的数据。Spout 是数据源。Bolt 处理数据。Task 是运行于 Spout 或 Bolt 中的线程。Worker 是运行这些线程的进程。Stream Grouping 规定了 Bolt 接收什么东西作为输入数据。数据可以随机分配(术语为 Shuffle),或者根据字段值分配(术语为 Fields),或者 广播(术语为 All),或者总是发给一个 Task(术语为 Global) ,也可以不关心该数据(术语为 None) ,或者由自定义逻辑来决定(术语为 Direct) 。Topology 是由 Stream Grouping 连接起来的 Spout 和 Bolt 节点网络。

       可以和 Storm 相提并论的系统有 Esper、Streambase、HStreaming 和 Yahoo S4。其中和Storm 最接近的就是 S4。 两者 最大的区别在于 Storm 会保证消息得到处理。 这些系统中有的拥有 内建数据存储层, 这是Storm所 没有的, 如果需要持久化, 可以使用一个类似于Cassandra或 Riak 这样的外部数据库。

1.2 基本概念

     首先我们通过一个 Storm 和 Hadoop 的对比来了解 Storm 中的基本概念。


实时流式计算——Storm(一)

接下来我们再来具体看一下这些概念。

      Nimbus:负责资源分配和任务调度。

      Supervisor:负责接受 nimbus 分配的任务,启动和停止属于自己管理的 worker 进

程。

      Worker:运行具体处理组件逻辑的进程。

      Task:worker 中每一个 spout/bolt 的线程称为一个 task。在 Storm0.8 之后,task 不再与物理线程对应,同一个 spout/bolt 的 task 可能会共享一个物理线程,该线程称为 executor。

下面这个图描述了以上几个角色之间的关系

实时流式计算——Storm(一)

  * Topology:Storm 中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。

    * Spout: 在一个 topology 中产生源数据流的组件。 通常情况下 spout 会从外部数据源中读取数据,然后转换为 topology 内部的源数据。Spout 是一个主动的角色,其接口中有个 nextTuple()函数,Storm 框架会不停地调用此函数,用户只要在其中生成源数据即可。

    * Bolt:在一个 topology 中接受数据然后执行处理的组件。Bolt 可以执行过滤、函数操作、合并、写数据库等任何操作。Bolt 是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。

    * Tuple:一次消息传递的基本单元。本来应该是一个 key-value 的 map,但是由于各个组件间传递的 tuple 的字段名称已经事先定义好,所以 tuple 中只要按序填入各个 value 就行了,所以就是一个 value list.

    *  Stream:源源不断传递的 tuple 就组成了 stream。

    Hadoop 是实现了 MapReduce 的思想,将数据切片计算来处理大量的离线数据。Hadoop处理的数据必须是已经存放在 hdfs 上或者类似 hbase 的数据库中,所以 Hadoop 实现的时候是通过 移动计算到这些存放数据的机器上来提高效率的;而 Storm 不同,Storm 是一个流计算框架,处理的数据是实时消息队列中的,所以需要我们写好一个 topology 逻辑放在那,接收进来的数据来处理,所以是通过 移动数据平均分配到机器资源来获得高效率。   Hadoop 的优点是处理数据量大(瓶颈是硬盘和 namenode,网络等) ,分析灵活,可以

通过实现 dsl,mdx 等拼接 Hadoop 命令或者直接使用 hive,pig 等来灵活分析数据。适应对大量维度进行组合分析。其缺点就是慢:每次执行前要分发 jar 包,Hadoop 每次 map 数据超出阙值后会将数据写入本地文件系统,然后在 reduce 的时候再读进来。

     Storm 的优点是全内存计算,因为内存寻址速度是硬盘的百万倍以上,所以 Storm 的速度相比较 Hadoop 非常快 (瓶颈是内存, cpu) 。 其缺点就是不够灵活: 必须要先写好 topology结构来等数据进来分析

     Storm 关注的是数据 多次处理一次写入, 而 Hadoop 关注的是数据 一次写入, 多次查询使用。Storm 系统运行起来后是 持续不断的,而 Hadoop 往往只是在 业务需要时调用数据。

1.3 基础架构

       Storm集群类似于一个hadoop集群,在hadoop运行“MapReduce job”,在Storm运行“topologies”。“Job”和“topologies”本身有很大不同。其中关键区别是,MapReduce的工作最终完成,而topologies处理消息永远保持(或者直到你杀了它)。Storm集群中有两类节点:主节点和工作节点。节点上运行一个叫做“Nimbus”的守护进程,也就是类似 Hadoop “JobTracker” 。Nimbus负责在集群分发的代码,将任务分配给其他机器工作节点,和故障检测。

     每个工作节点运行一个“supervisor“的守护进程,supervisor监听分配给他的机器,根据Nimbus的委派在必要时关闭启动工作进程  。每个工作进程执行topology的一个子集。一个运行中topology由运行在很多机器上的工作进程组成。

实时流式计算——Storm(一)

      Nimbus与supervisor之间协调工作是通过zookeeper集群提供状态信息。Nimbus与upervisor之间不能链接和通信。他们所有状态信息都维护在zookeeper中或本地磁盘中;这样就意味着kill -9 nimbus 或supervisor之后,不需要备份,这种设计使Storm具有令人难以置信的稳定性。

     Storm实现一种数据流模型,其中数据持续地流经一个转换实体网络。一个数据流的抽象称为”流“,这是一个无限的元祖序列。元祖(tuple)就像一种使用一些附加的序列化代码来表示标准数据类型(比如整数、 浮点和字节数组) 或用户定义类型的结构。每个流由一个唯一 ID 定义,这个 ID 可用于构建数据源和接收器(sink)的拓扑结构。流起源于 喷嘴(spout) ,Spout 将数据从外部来源流入 Storm 拓扑结构中。

实时流式计算——Storm(一)

接收器(或提供转换的实体)称为 螺栓(bolt) ) 。螺栓实现了一个流上的单一转换和一个Storm 拓扑结构中的所有处理。Bolt 既可实现 MapReduce 之类的传统功能,也可实现更复杂的操作(单步功能) ,比如过滤、聚合或与数据库等外部实体通信。典型的 Storm 拓扑结构会实现多个转换,因此需要多个具有独立元组流的 Bolt。Bolt 和 Spout 都实现为 Linux 系统中的一个或多个任务。

        但是,Storm 架构中一个最有趣的特性是有保障的消息处理。Storm 可保证一个 Spout发射出的每个元组都会处理;如果它在超时时间内没有处理,Storm 会从该 Spout 重新发射该元组。 此功能需要一些聪明的技巧来在拓扑结构中跟踪元素, 也是 Storm 的重要的附加价值之一。    除了支持可靠的消息传送外,Storm 还使用 ØMQ(ZeroMQ)最大化消息传送性能(删除中间排队,实现消息在任务间的直接传送) 。ØMQ 合并了拥塞检测并调整了它的通信,以优化可用的带宽。    

      Storm 0.9.0.1 版本的第一亮点是引入 Netty Transport。 Storm 网络传输机制实现可插拔形式,当前包含 两种方式:原来的 ØMQ 传输,以及新的 Netty 实现;在早期版本中(0.9.x之前的版本) ,Storm 只支持 ØMQ 传输,由于 ØMQ 是一个本地库(native library),对平台的依赖性较高,要完全正确安装还是有一定挑战性。而且版本之间的差异也比较大;NettyTransport 提供了纯 JAVA 的替代方案,消除了 Storm 的本地库依赖,且比 ØMQ 的网络传输性能快一倍以。

1.4 工作原理

实时流式计算——Storm(一)

【 计算拓补:Topologies】

        一个实时计算应用程序的逻辑在 Storm 里面被封装到 topology 对象里面,我把它叫做    计算拓补。Storm 里面的 topology 相当于 Hadoop 里面的一个 MapReduce Job,它们的关键    区别是:一个 MapReduce Job 最终总是会结束的,然而一个 Storm 的 topoloy 会一直运行,    除非你显式的杀死它。一个 Topology 是 Spouts 和 Bolts 组成的图状结构,而链接 Spouts 和Bolts 的则是 Stream groupings。

    【 消息流:Streams】

        消息流是 Storm 里面的最关键的抽象。 一个消息流是一个没有边界的 tuple 序列, 而这    些 tuples 会被以一种分布式的方式并行地创建和处理。 对消息流的定义主要是对消息流里面的 tuple 的定义, 我们会给 tuple 里的每个字段一个名字。 并且不同 tuple 的对应字段的类型 必须一样。 也就是说: 两个 tuple 的第一个字段的类型必须一样, 第二个字段的类型必须  一样,但是第一个字段和第二个字段可以有不同的类型。在默认的情况下,tuple 的字段类    型可以是:integer、long、short、byte、string、double、float、boolean 和 byte array。你还可    以自定义类型——只要你实现对应的序列化器。 每个消息流在定义的时候会被分配给一个 id,因为单向消息流是那么的普遍, OutputFieldsDeclarer 定义了一些方法让你可以定义一个 stream 而不用指定这个 id。在这种 情况下这个 stream 会有个默认的 id: 1.

    【 消息源 :Spouts】

        消息源 Spouts 是 Storm 里面一个 topology 里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向 topology 里面发出消息:tuple。 消息源 Spouts 可以是可靠的也可 以是不可靠的。 一个可靠的消息源可以重新发射一个 tuple 如果这个 tuple 没有被 Storm 成功的处理, 但是一个不可靠的消息源 Spouts 一旦发出一个 tuple 就把它彻底忘了——也就不可能再发了。

    消息源可以发射多条消息流 stream。使OutFieldsDeclarer.declareStream 来定义多个stream,然后使用 SpoutOutputCollector 来发射指定的 sream。Spout 类里面最重要的方法是 nextTuple 要么发射一个新的 tuple 到 topology 里面或者简    单的返回如果已经没有新的 tuple 了。要注意的是 nextTuple 方法不能 block Spout 的实现, 因为 Storm 在同一个线程上面调用所有消息源 Spout 的方法。另外两个比较重要的 Spout 方法是 ack 和 fail。 Storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack, 否则调用 fail。Storm 只对可靠的 spout 调用 ack 和 fail。

    【 消息处理者 :Bolts】

        所有的消息处理逻辑被封装在 bolts 里面。 Bolts 可以做很多事情:过滤、聚合、查询 数据库等等。

        Bolts 可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤, 从而也就需

    要经过很多 Bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出 每个图片的转发数量。 第二步找出转发最多的前 10 个图片。 (如果要把这个过程做得更具有扩展性那么可能需要更多的步骤) 。

        Bolts 可以发射多条消息流,使用 OutputFieldsDeclarer.declareStream 定义 stream,使用OutputCollector.emit 来选择要发射的 stream。

        Bolts 的主要方法是 execute,它以一个 tuple 作为输入,Bolts 使用 OutputCollector 来发射 tuple, Bolts 必须要为它处理的每一个 tuple 调用 OutputCollector 的 ack 方法, 以通知 Storm这个 tuple 被处理完成了。从而我们通知这个 tuple 的发射者 Spouts。 一般的流程是: Bolts 处理一个输入 tuple,发射 0 个或者多个 tuple,然后调用 ack 通知 Storm 自己已经处理过这个 tuple 了。Storm 提供了一个 IBasicBolt 会自动调用 ack。

    【Stream groupings: 消息分发策略】

        定义一个Topology的其中一步是定义每个bolt接受什么样的流作为输入。stream grouping就是用来定义一个stream应该如果分配给bolt上面多少个tasks.

          Shuffle Grouping: 随机分组,随机派发 stream 里面的 tuple,保证每个 bolt 接收到的 tuple 数目相同。

          Fields Grouping: 按字段分组,比如按 userid 来分组,具有同样 userid 的 tuple 会被分到相同的 Bolts,而不同的 userid 则会被分配到不同的 Bolts。

          All Grouping: 广播发送,对于每一个 tuple,所有的 Bolts 都会收到。

          Global Grouping: 全局分组,这个 tuple 被分配到 Storm 中的一个 bolt 的其中一个task。再具体一点就是分配给 id 值最低的那个 task。

          Non Grouping: 不分组,表示 stream 不关心到底谁会收到它的 tuple。目前这种分组和 Shuffle grouping 是一样的效果,有一点不同的是 Storm 会把这个 bolt 放到这 个 bolt 的订阅者 同一个线程里面去执行。

          Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消 息的发送者指定由消息接收者的哪个 task 处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect  方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的 taskid (OutputCollector.emit 方法也会返回 taskid) 。

实时流式计算——Storm(一)数据工匠                         专注  专业   聚焦  


              

以上是关于实时流式计算——Storm的主要内容,如果未能解决你的问题,请参考以下文章

storm 流式计算框架

海数据技术沙龙——Flink:新一代流式计算框架&Storm/JStorm: 流式计算框架的应用

大数据流式计算三种框架:Storm,Spark和Samza

[第8期] 流式计算之 Storm 爱用者初尝 Flink

技术干货流式计算 Spark Streaming 和 Storm 对比

Storm 第一章 核心组件及编程模型