Storm实时处理架构
Posted 每日小新
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm实时处理架构相关的知识,希望对你有一定的参考价值。
Storm🥇
Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,
目录
1、DAG:有向无循环图
它由有限个顶点和有向边组成,每条有向边都从一个顶点指向另一个顶点;从任意一个顶点出发都不能通过这些有向边回到原来的顶点。有向无环图就是从一个图中的任何一点出发,不管走过多少个分叉路口,都没有回到原来这个点的可能性。
2、Storm的特性
1.适用场景广泛:
storm可以实时处理消息和更新DB,对一个数据量进行持续的查询并返回客户端(持续计算)
对一个耗资源的查询作实时并行化的处理(分布式方法调用,即DRPC),storm的这些基础API可以满足大量的场景。
2、可伸缩性高:
Storm的可伸缩性可以让storm每秒可以处理的消息量达到很高。
Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易的扩展。
3、保证无数据丢失:
实时系统必须保证所有的数据被成功的处理。storm保证每一条消息都会被处理。
4、异常健壮:
storm集群非常容易管理,轮流重启节点不影响应用。
5、容错性好:
在消息处理过程中出现异常, storm会进行重试
6、语言无关性:
Storm的topology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.
3、Storm物理架构
1、nimbus:
Storm的Master,负责资源分配和任务调度,一个Storm集群只有一个Nimbus,此节点是一个无状态节点,所有的一切都存储再Zookeeper
2、supervisor:
Storm的Slave,负责接受Nimbus分配的任务管理所有的Worker,一个Supervisor节点包含多个Workers进程,默认4个,一般情况一个topology对应一个worker
3、workers:
工作进程(Process),每个工作进程中都有多个Task。
4、Task:
在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。
worker中每一个spout/bolt的线程称为一个task
同一个spout/bolt的task可能会共享一个物理线程(Thread),该线程称为executor
5、Storm的并行机制:
Topology由一个或多个Spout/Bolt组件构成。运行中的Topology由一个或多个Supervisor节点的Worker构成
6、流式计算框架(计算框架)
客户端将数据发送给MQ(消息队列),然后传递到Storm中进行计算
最终计算的结果存储到数据库中(HBase,mysql)
客户端不要求服务器返回结果,客户端可以一直向Storm发送数据
客户端相当于生产者,Storm相当于消费者
4、Storm的数据分发策略
--->
1、ShuffleGrouping
随机分组,随机派发stream里面的tuple,保证每个bolttask接收到的tuple数目大致相同。轮询,平均分配
优点:
为tuple选择task的代价小;
bolt的tasks之间的负载比较均衡;
缺点:
上下游components之间的逻辑组织关系不明显;
--->
2、FieldsGrouping
按字段分组
比如,按"user-id"这个字段来分组,那么具有同样"user-id"的tuple会被分到相同的Bolt里的一个task,而不同的"user-id"则可能会被分配到不同的task。
优点:
上下游components之间的逻辑组织关系显著;
缺点:
付出为tuple选择task的代价;
bolt的tasks之间的负载可能不均衡,根据field字段而定;
--->
3、AllGrouping
广播发送,对于每一个tuple,所有的bolts都会收到
优点:
上游事件可以通知下游bolt中所有task;
缺点:
tuple消息冗余,对性能有损耗,请谨慎使用;
--- >
4、GlobalGrouping
全局分组,把tuple分配给taskid最低的task。
优点:
所有上游消息全部汇总,便于合并、统计等;
缺点:
bolt的tasks之间的负载可能不均衡,id最小的task负载过重;
--- >
5、DirectGrouping
指向型分组,这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。
只有被声明为DirectStream的消息流可以声明这种分组方法。
而且这种消息tuple必须使用emitDirect方法来发射。
消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id)
优点:
Topology的可控性强,且组件的各task的负载可控;
缺点:
当实际负载与预估不符时性能削弱;
--->
6、Localorshufflegrouping
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的ShuffleGrouping行为一致
优点:
相对于ShuffleGrouping,因优先选择同进程task间传输而降低tuple网络传输代价,但因寻找同进程的task而消耗CPU和内存资源,因此应视情况来确定选择ShuffleGrouping或LocalOrShuffleGrouping;
缺点:
上下游components之间的逻辑组织关系不明显;
--->
7、NoneGrouping
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shufflegrouping是一样的效果。有一点不同的是storm会把使用nonegrouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
8、customGrouping
自定义,相当于mapreduce那里自己去实现一个partition一样。
简单案例
测试小案例:
-- NumberTopology --
/*
* 创建一个有向无环图
* */
public class NumberTopology
public static void main(String[] args)
//创建Topology的构建器
TopologyBuilder topologyBuilder = new TopologyBuilder();
//开始构建整个流程(Spout)
topologyBuilder.setSpout("numberspout", new NumberSpout());
//开始构建整个流程(Bolt)
topologyBuilder.setBolt("numberBolt", new NumberBolt()).shuffleGrouping("numberspout");
//Bolt
//topologyBuilder.setBolt("ownBolt",new OwnBolt()).shuffleGrouping("numberspout");
//启动Topology
Config conf = new Config();
//创建一个topology
StormTopology topology = topologyBuilder.createTopology();
//本地模式启动集群
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("numberTopology", conf, topology);
-- NumberBolt --
public class NumberBolt extends BaseBasicBolt
//声明一个统计器
private static int sum;
/**
* 处理数据的业务逻辑
*
* @param input
* @param collector
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector)
System.out.println("从上游获取的数据:" + input);
System.out.println("从上游获取的数据为:" + input.getInteger(0) + "--" + input.getIntegerByField("num"));
//开始统计
sum += input.getInteger(0);
System.out.println("截止到本次,共获取数据和为:" + sum);
/**
* 如果需要向下传递数据,需要提前定义数据的格式
*
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("num"));
-- NumberSpout --
public class NumberSpout extends BaseRichSpout
//声明一个SpoutOutputCollector对象,用于发送数据
private SpoutOutputCollector collector;
//声明一个计数器
private int number;
/**
* 当我们执行任务的时候,用于初始化对象
*
* @param conf
* @param context
* @param collector 帮助我们将Tuple发送到下一个Bolt
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
//获取初始化时的发送器
this.collector = collector;
/**
* 重复调用这个方法从源数据获取一条记录
* 我们根据业务需求近期进行封装,然后通过SpoutOutputCollector发送给下一个Bolt
*/
@Override
public void nextTuple()
//将数据发送下一个Bolt
this.collector.emit(new Values(number++));
try
//限制传输速度
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
/**
* 定义你输出值对应的属性
*
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("num"));
5、Storm的通信机制
① Worker进程间通信原理
宏观
① 每个worker进程都一个独立的接受线程和发送线程
② 接收线程接受外部发送的消息到执行器(executor)的收集队列(incoming-queue)中
③ 发送线程从worker的执行器发送队列(transfer-queue)中读取消息,通过网络发给其他worker
内部微观
① worker中有多个执行器executor,每个执行器都有对应的接受进程和发送线程,
② worker接受线程将消息分配给指定taskid的executor的接受器中
③ executor会有段单独的线程进行处理,最后结果outgoing-queue,达到阈值进行批量发送tuple(元组)
② Worker进程内通信原理
Disruptor是一个Queue队列。
Disruptor一种线程之间信息无锁的交换方式。
Disruptor主要特点:
1、 没有竞争=没有锁=非常快。
2、 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结
Disruptor 核心技术点:
Disruptor可以看成一个事件监听或消息机制,在队列中一边生产者放入消息,另外一边消费者并行取出处理.
底层是单个数据结构:一个ring buffer(环形数据缓冲区)
③ Storm的容错机制
节点故障
Nimbus宕机
单点故障,1.0后便是高可用的了
非Nimbus宕机
故障时,所有任务超时,Nimbus将会将任务重新分配到别的服务器上
进程故障
1、Worker
worker由Supervisor负责监控,一旦worker故障,会尝试重启,如果还是失败并且失去心跳机制,那么Nimbus将会把worker的任务分配给其他服务器
2、Supervisor
一旦遇到异常情况,直接自动毁灭,无状态(信息存放在zookeeper中)
① 快速失败:在遍历过程中进行增删改,抛出异常 多进程下不能发送修改
② 安全失败:遍历的数据先复制一份,对复制的进行遍历并发修改,来保证安全
④ 任务级容错
1、Bolt 任务冲突crash引起消息未被应答
执行超时,会调用fail方法
2、acker任务失败
首先持有的消息超时失效,Spout的fail方法执行
3、Spout任务失败
此时消息的完整性交给外部设备比如MQ
⑤ 消息容完整性
每个从Spout(Storm中数据源点)发出的Tuple(Storm中最小的消息单元)可能会生成成千上万个新的Tuple
形成一颗Tuple树,当整颗Tuple树的节点都被成功处理了,我们就说从Spout发出的Tuple被完全处理了。
ACKER:
acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径,
如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了
相反则会告知spout该消息处理成功了。
Flume+Kafka+Storm+Redis实时分析系统基本架构
今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要求也会不同。这篇文章的目的只是带大家入个门,让大家对实时分析技术有一个简单的认识,并和大家一起做学习交流。
文章的最后还有Troubleshooting,分享了作者在部署本文示例程序过程中所遇到的各种问题和解决方案。
系统基本架构
整个实时分析系统的架构就是先由电商系统的订单服务器产生订单日志, 然后使用Flume去监听订单日志,并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 接着由Storm系统消费Kafka中的消息,同时消费记录由Zookeeper集群管理,这样即使Kafka宕机重启后也能找到上次的消费记录,接着从上次宕机点继续从Kafka的Broker中进行消费。但是由于存在先消费后记录日志或者先记录后消费的非原子操作,如果出现刚好消费完一条消息并还没将信息记录到Zookeeper的时候就宕机的类似问题,或多或少都会存在少量数据丢失或重复消费的问题, 其中一个解决方案就是Kafka的Broker和Zookeeper都部署在同一台机子上。接下来就是使用用户定义好的Storm Topology去进行日志信息的分析并输出到Redis缓存数据库中(也可以进行持久化),最后用Web APP去读取Redis中分析后的订单信息并展示给用户。之所以在Flume和Storm中间加入一层Kafka消息系统,就是因为在高并发的条件下, 订单日志的数据会井喷式增长,如果Storm的消费速度(Storm的实时计算能力那是最快之一,但是也有例外, 而且据说现在Twitter的开源实时计算框架Heron比Storm还要快)慢于日志的产生速度,加上Flume自身的局限性,必然会导致大量数据滞后并丢失,所以加了Kafka消息系统作为数据缓冲区,而且Kafka是基于log File的消息系统,也就是说消息能够持久化在硬盘中,再加上其充分利用Linux的I/O特性,提供了可观的吞吐量。架构中使用Redis作为数据库也是因为在实时的环境下,Redis具有很高的读写速度。
业务背景
各大电商网站在合适的时间进行各种促销活动已是常态,在能为网站带来大量的流量和订单的同时,对于用户也有不小的让利,必然是大家伙儿喜闻乐见的。在促销活动期间,老板和运营希望能实时看到订单情况,老板开心,运营也能根据实时的订单数据调整运营策略,而让用户能实时看到网站的订单数据,也会勾起用户的购买欲。但是普通的离线计算系统已然不能满足在高并发环境下的实时计算要求,所以我们得使用专门实时计算系统,如:Storm, Heron, Spark Stream等,去满足类似的需求。
既然要分析订单数据,那必然在订单产生的时候要把订单信息记录在日志文件中。本文中,作者通过使用log4j2,以及结合自己之前开发电商系统的经验,写了一个订单日志生成模拟器,代码如下,能帮助大家随机产生订单日志。下面所展示的订单日志文件格式和数据就是我们本文中的分析目标,本文的案例中用来分析所有商家的订单总销售额并找出销售额钱20名的商家。
订单数据格式:
orderNumber: XX | orderDate: XX | paymentNumber: XX | paymentDate: XX | merchantName: XX | sku: [ skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;skuName: XX skuNum: XX skuCode: XX skuPrice: XX totalSkuPrice: XX;] | price: [ totalPrice: XX discount: XX paymentPrice: XX ]
订单日志生成程序:
使用log4j2将日志信息写入文件中,每小时滚动一次日志文件
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="myConsole" target="SYSTEM_OUT">
<PatternLayout pattern="%dyyyy-MM-dd HH:mm:ss [%t] %-5level %logger36 - %msg%n"/>
</Console>
<RollingFile name="myFile" fileName="/Users/guludada/Desktop/logs/app.log"
filePattern="/Users/guludada/Desktop/logs/app-%dyyyy-MM-dd-HH.log">
<PatternLayout pattern="%dyyyy-MM-dd HH:mm:ss [%t] %-5level %logger36 - %msg%n"/>
<Policies>
<TimeBasedTriggeringPolicy />
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<Root level="Info">
<AppenderRef ref="myConsole"/>
<AppenderRef ref="myFile"/>
</Root>
</Loggers>
</Configuration>
生成器代码:
package com.guludada.ordersInfo;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
// Import log4j classes.
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ordersInfoGenerator
public enum paymentWays
Wechat,Alipay,Paypal
public enum merchantNames
优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
public enum productNames
黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
沙滩拖鞋
float[] skuPriceGroup = 299,399,699,899,1000,2000;
float[] discountGroup = 10,20,50,100;
float totalPrice = 0;
float discount = 0;
float paymentPrice = 0;
private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
private int logsNumber = 1000;
public void generate()
for(int i = 0; i <= logsNumber; i++)
logger.info(randomOrderInfo());
public String randomOrderInfo()
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String orderNumber = randomNumbers(5) + date.getTime();
String orderDate = sdf.format(date);
String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);
String paymentDate = sdf.format(date);
String merchantName = randomMerchantNames();
String skuInfo = randomSkus();
String priceInfo = calculateOrderPrice();
return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName +
" | sku: " + skuInfo + " | price: " + priceInfo;
private String randomPaymentWays()
paymentWays[] paymentWayGroup = paymentWays.values();
Random random = new Random();
return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
private String randomMerchantNames()
merchantNames[] merchantNameGroup = merchantNames.values();
Random random = new Random();
return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
private String randomProductNames()
productNames[] productNameGroup = productNames.values();
Random random = new Random();
return productNameGroup[random.nextInt(productNameGroup.length)].name();
private String randomSkus()
Random random = new Random();
int skuCategoryNum = random.nextInt(3);
String skuInfo ="[";
totalPrice = 0;
for(int i = 1; i <= 3; i++)
int skuNum = random.nextInt(3)+1;
float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
float totalSkuPrice = skuPrice * skuNum;
String skuName = randomProductNames();
String skuCode = randomCharactersAndNumbers(10);
skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
+ " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";
totalPrice += totalSkuPrice;
skuInfo += " ]";
return skuInfo;
private String calculateOrderPrice()
Random random = new Random();
discount = discountGroup[random.nextInt(discountGroup.length)];
paymentPrice = totalPrice - discount;
String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";
return priceInfo;
private String randomCharactersAndNumbers(int length)
String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
String randomCharacters = "";
Random random = new Random();
for (int i = 0; i < length; i++)
randomCharacters += characters.charAt(random.nextInt(characters.length()));
return randomCharacters;
private String randomNumbers(int length)
String characters = "0123456789";
String randomNumbers = "";
Random random = new Random();
for (int i = 0; i < length; i++)
randomNumbers += characters.charAt(random.nextInt(characters.length()));
return randomNumbers;
public static void main(String[] args)
ordersInfoGenerator generator = new ordersInfoGenerator();
generator.generate();
收集日志数据
采集数据的方式有多种,一种是通过自己编写shell脚本或Java编程采集数据,但是工作量大,不方便维护,另一种就是直接使用第三方框架去进行日志的采集,一般第三方框架的健壮性,容错性和易用性都做得很好也易于维护。本文采用第三方框架Flume进行日志采集,Flume是一个分布式的高效的日志采集系统,它能把分布在不同服务器上的海量日志文件数据统一收集到一个集中的存储资源中,Flume是Apache的一个顶级项目,与Kafka也有很好的兼容性。不过需要注意的是Flume并不是一个高可用的框架,这方面的优化得用户自己去维护。
Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件(Flume Event)并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分给下一个装在分布式系统中其它服务器上的Flume Agent进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。
在本文中,Flume的Source我们选择的是Exec Source,因为是实时系统,直接通过tail 命令来监听日志文件,而在Kafka的Broker集群端的Flume我们选择Kafka Sink 来把数据下沉到Kafka消息系统中。
下图是来自Flume官网里的Flume拉取数据的架构图:
图片来源:http://flume.apache.org/FlumeUserGuide.html
订单日志产生端的Flume配置文件如下:
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target
agent.sources.origin.type = exec
agent.sources.origin.command = tail -F /export/data/trivial/app.log
agent.sources.origin.channels = memorychannel
agent.sources.origin.interceptors = i1
agent.sources.origin.interceptors.i1.type = static
agent.sources.origin.interceptors.i1.key = topic
agent.sources.origin.interceptors.i1.value = ordersInfo
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel
agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000
agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545
Kafka 消息系统端Flume配置文件
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target
agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel
agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000
agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink
#agent.sinks.target.topic = bigdata
agent.sinks.target.brokerList=localhost:9092
agent.sinks.target.requiredAcks=1
agent.sinks.target.batchSize=100
agent.sinks.target.channel = memorychannel
这里需要注意的是,在日志服务器端的Flume agent中我们配置了一个interceptors,这个是用来为Flume Event(Flume Event就是拉取到的一行行的日志信息)的头部添加key为“topic”的K-V键值对,这样这条抓取到的日志信息就会根据topic的值去到Kafka中指定的topic消息池中,当然还可以为Flume Event额外配置一个key为“Key”的键值对,Kafka Sink会根据key“Key”的值将这条日志信息下沉到不同的Kafka分片上,否则就是随机分配。在Kafka集群端的Flume配置里,有几个重要的参数需要注意,“topic”是指定抓取到的日志信息下沉到Kafka哪一个topic池中,如果之前Flume发送端为Flume Event添加了带有topic的头信息,则这里可以不用配置;brokerList就是配置Kafka集群的主机地址和端口;requireAcks=1是配置当下沉到Kafka的消息储存到特定partition的leader中成功后就返回确认消息,requireAcks=0是不需要确认消息成功写入Kafka中,requireAcks=-1是指不光需要确认消息被写入partition的leander中,还要确认完成该条消息的所有备份;batchSize配置每次下沉多少条消息,每次下沉的数量越多延迟也高。
Kafka消息系统
这一部分我们将谈谈Kafka的配置和使用,Kafka在我们的系统中实际上就相当于起到一个数据缓冲池的作用, 有点类似于ActiveQ的消息队列和Redis这样的缓存区的作用,但是更可靠,因为是基于log File的消息系统,数据不容易丢失,以及能记录数据的消费位置并且用户还可以自定义消息消费的起始位置,这就使得重复消费消息也可以得以实现,而且同时具有队列和发布订阅两种消息消费模式,十分灵活,并且与Storm的契合度很高,充分利用Linux系统的I/O提高读写速度等等。另一个要提的方面就是Kafka的Consumer是pull-based模型的,而Flume是push-based模型。push-based模型是尽可能大的消费数据,但是当生产者速度大于消费者时数据会被覆盖。而pull-based模型可以缓解这个压力,消费速度可以慢于生产速度,有空余时再拉取那些没拉取到的数据。
Kafka是一个分布式的高吞吐量的消息系统,同时兼有点对点和发布订阅两种消息消费模式。Kafka主要由Producer,Consumer和Broker组成。Kafka中引入了一个叫“topic”的概念,用来管理不同种类的消息,不同类别的消息会记录在到其对应的topic池中,而这些进入到topic中的消息会被Kafka写入磁盘的log文件中进行持久化处理。Kafka会把消息写入磁盘的log file中进行持久化对于每一个topic 里的消息log文件,Kafka都会对其进行分片处理,而每一个 消息都会顺序写入中log分片中,并且被标上“offset”的标量来代表这条消息在这个分片中的顺序,并且这些写入的消息无论是内容还是顺序都是不可变的。所以Kafka和其它消息队列系统的一个区别就是它能做到分片中的消息是能顺序被消费的,但是要做到全局有序还是有局限性的,除非整个topic只有一个log分片。并且无论消息是否有被消费,这条消息会一直保存在log文件中,当留存时间足够长到配置文件中指定的retention的时间后,这条消息才会被删除以释放空间。对于每一个Kafka的Consumer,它们唯一要存的Kafka相关的元数据就是这个“offset”值,记录着Consumer在分片上消费 到了哪一个位置。通常Kafka是使用Zookeeper来为每一个Consumer保存它们的offset信息,所以在启动Kafka之前需要有一个Zookeeper集群;而且Kafka默认采用的是先记录offset再读取数据的策略,这种策略会存在少量数据丢失的可能。不过用户可以灵活设置Consumer的“offset”的位置,在加上消息记录在log文件中,所以是可以重复消费消息的。log的分片和它们的备份会分散保存在集群的服务器上,对于每一个partition,在集群上都会有一台这个partition存在的服务器作为leader,而这个partitionpartition的其它备份所在的服务器做为follower,leader负责处理关于这个partition的所有请求,而follow er负责这个partition的其它备份的同步工作,当leader服务器宕机时,其中一个follower服务器就会被选举为新的leader。
一般的消息系统分为两种模式,一种是点对点的消费模式,也就是queuing模式,另一种是发布订阅模式,也就是publish-subscribe模式,而Kafka引入了一个Consumer Group的概念,使得其能兼有两种模式。在Kafka中,每一个consumer都会标明自己属于哪个consumer group,每个topic的消息都会分发给每一个subscribe了这个topic的所有consumer group中的一个consumer实例。所以当所有的consumers都在同一个consumer group中,那么就像queuing的消息系统,一个message一次只被一个consumer消费。如果每一个consumer都有不同consumer group,那么就像public-subscribe消息系统一样,一个消息分发给所有的consumer实例。对于普通的消息队列系统,可能存在多个consumer去同时消费message,虽然message是有序地分发出去的,但是由于网络延迟的时候到达不同的consumer的时间不是顺序的,这时就失去了顺序性,解决方案是只用一个consumer去消费message,但显然不太合适。而对于Kafka来说,一个partiton只分发给每一个consumer group中的一个consumer实例,也就是说这个partition只有一个consumer实例在消费,所以可以保证在一个partition内部数据的处理是有序的,不同之处就在于Kafka内部对消息进行了分片处理,虽然看上去也是单consumer的做法,但是分片机制保证了并发消费。如果要做到全局有序,那么整个topic中的消息只有一个分片,并且每一个consumer group中只能有一个consumer实例。这实际上就是彻底牺牲了消息消费时的并发度。
Kafka的配置和部署十分简单
1. 首先启动Zookeeper集群,Kafka需要Zookeeper集群来帮助记录每一个Consumer的offset
2. 为集群上的每一台Kafka服务器单独配置配置文件,比如我们需要设置有两个节点的Kafka集群,那么节点1和节点2的最基本的配置如下:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=export/data/kafka
zookeeper.connect=localhost:2181
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9093
log.dir=/export/data/kafka
zookeeper.connect=localhost:2181
b
roker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上
Kafka进程要监听的端口
,使用默认的就行; log.dir是Kafka的log文件(记录消息的log file)存放目录; zookeeper.connect就是Zookeeper的URI地址和端口。3. 配置完上面的配置文件后,只要分别在节点上输入下面命令启动Kafka进程就可以使用了
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
Storm实时计算框架 接下来开始介绍本篇文章要使用的实时计算框架 Storm 。 Strom 是一个非常快的实时计算框架,至于快到什么程度呢?官网首页给出的数据是每一个 Storm 集群上的节点每一秒能处理一百万条数据。相比 Hadoop 的 “Mapreduce” 计算框架, Storm 使用的是 "Topology" ; Mapreduce 程序在计算完成后最终会停下来,而 Topology 则是会永远运行下去除非你显式地使用 “kill -9 XXX” 命令停掉它。和大多数的集群系统一样, Storm 集群也存在着 Master 节点和 Worker 节点,在 Master 节点上运行的一个守护进程叫 “Nimbus” ,类似于 Hadoop 的 “JobTracker” 的功能,负责集群中计算程序的分发,任务的分发,监控任务和工作节点的运行情况等; Worker 节点上运行的守护进程叫 “Supervisor” ,负责接收 Nimbus 分发的任务并运行,每一个 Worker 上都会运行着 Topology 程序的一部分,而一个 Topology 程序的运行就是由集群上多个 Worker 一起协同工作的。值得注意的是 Nimubs 和 Supervisor 之间的协调工作也是通过 Zookeeper 来管理的, Nimbus 和 Supervisor 自己本身在集群上都是无状态的,它们的状态都保存在 Zookeeper 上,所以任何节点的宕机和动态扩容都不会影响整个集群的工作运行,并支持 fast-fail 机制。
Storm 有一个很重要的对数据的抽象概念,叫做 “Stream” ,我们姑且称之为数据流,数据流 Stream 就是由之间没有任何关系的松散的一个一个的数据元组 “tuples” 所组成的序列。要在 Storm 上做实时计算,首先你得有一个计算程序,这就是 “Topology” ,一个 Topology 程序由 “Spout” 和 “Bolt” 共同组成。 Storm 就是通过 Topology 程序将数据流 Stream 通过可靠 (ACK 机制 ) 的分布式计算生成我们的目标数据流 Stream ,就比如说把婚恋网站上当日注册的所有用户信息数据流 Stream 通过 Topology 程序计算出月收入上万年龄在 30 岁以下的新的用户信息流 Stream 。在我们的文章中, Spout 就是实现了特定接口的 Java 类,它相当于数据源,用于产生数据或者从外部接收数据;而 Bolt 就是实现了 Storm Bolt 接口的 Java 类,用于消费从 Spout 发送出来的数据流并实现用户自定义的数据处理逻辑;对于复杂的数据处理,可以定义多个连续的 Bolt 去协同处理。最后在程序中通过 Spout 和 Bolt 生成 Topology 对象并提交到 Storm 集群上执行。
tuples是Storm的数据模型,,由值和其所对应的field所组成,比如说在Spout或Bolt中定义了发出的元组的field为:(name,age,gender),那么从这个Spout或Bolt中发出的数据流的每一个元组值就类似于(''咕噜大大",27,"中性")。 在 Storm 中还有一个 Stream Group 的概念,它用来决定从 Spout 或或或 Bolt 组件中发出的 tuples 接下来应该传到哪一个组件中或者更准确地说在程序里设置某个组件应该接收来自哪一个组件的 tuples; 并且在 Storm 中提供了多个
以上是关于Storm实时处理架构的主要内容,如果未能解决你的问题,请参考以下文章
Flume+Kafka+Storm+Redis实时分析系统基本架构
Flume+Kafka+Storm+Redis实时分析系统基本架构
10年大数据架构师,进行Kafka+Storm+HDFS整合实践,奉上一生经验