storm应用实践:实时事务处理《读书笔记》
Posted tangsilian
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm应用实践:实时事务处理《读书笔记》相关的知识,希望对你有一定的参考价值。
文章目录
阅读目的:
地址:https://weread.qq.com/web/reader/b06327c0717cc12cb06c27ek9bf32f301f9bf31c7ff0a60
阅读本书的前四章,了解storm常见的应用场景和实际案例。
第1章介绍大数据和Storm在大数据中所处的地位,该章的目的是展示一个选择Storm的理由和时机,一些关于大数据应用的关键特性,各类用于处理大数据的工具,以及明确Storm的工具类型。
第2章借助一个对某GitHub库提交数的统计案例,解释Storm的核心概念。该章将建立学习Storm的相关术语基础,尝试一小段代码来学习建立Storm工程,而这个案例中的概念也将贯穿本书。
第3章讲解在Storm下设计拓扑结构的最佳实践,同时以一个社交热力图的应用为例,展示了如何将问题基于Storm的结构来做分解,以便适用于程序的上下文实现部署。该章还讨论了如何处理不稳定的数据源,或者是不可靠的外部服务。同时在该章中介绍的首字节并行性,也将成为后续章节中的重点,最后在该章中还深入讨论了高级拓扑设计范式。
第4章以一个信用卡的授权系统为例,探讨Storm如何确保消息以上下文的形式传输,阐述Storm的实现机制,并且如何基于一套方案的部署,提供不同层面的可靠性支持。同时该章在最后做一个总结,说明如何在Storm的拓扑结构上,实现这种不同层次的可靠性支持。
第1章 Storm简介:
Storm是什么
大数据的定义
大数据工具
Storm如何应用于大数据场景
选择Storm的理由
Apache Storm是一个分布式实时计算框架,适用于处理无边界的流数据。将Storm与你当前使用的队列和持久化技术相结合,就能实现多种处理和转换流数据的方式。
1.1 什么是大数据
1.1.1 大数据的四大特性大数据有四个公认的特性:
体量(Volume)、速度(Velocity)、多样性(Variety)和真实性(Veracity)。
1.1.2 大数据工具
数据处理:这些工具主要用于基于指定的计算方式,让数据集释放出有价值的信息。
数据传输:这些工具主要用于将数据收集并提取至数据处理系统,或者在不同组件之间执行数据的传输。数据格式可以不限,但最终它们都会共用一套消息总线(或者称之为消息队列),例如Kafka、Flume、Scribe和Scoop。
数据存储:这些工具主要用于在不同阶段的数据处理期间,为数据集提供存储服务,它们可以是分布式的文件系统,例如分布式文件系统HDFS(Hadoop Distributed File System)或者GlusterFS以及Cassandra这类NoSQL结构的数据库。
数据处理分为批处理和流处理:
批处理:允许你对数据按照不同维度执行连接、合并或者聚合操作,这也是为什么批处理模式目前被广泛应用于机器学习的算法上。
流处理:实现立即输出结果,系统会依次对单点的简单数据执行处理。海量数据按照流的方式输入。
1.2 Storm如何应用于大数据应用场景
Storm是一个分布式实时计算框架,适用于处理无边界的流数据。
将Storm与你当前使用的队列和持久化技术相结合,就能实现多种处理和转换流数据的方式。
Storm与其他常用工具之间的对比:
Hadoop
在过去基本上是批处理系统的代名词,随着Hadoop的第二代版本发布,它不仅在系统层面更加完善,还可以说是逐渐成为一个具备大数据处理能力的应用平台。它的批处理组件称为Hadoop MapReduce,作业调度器和集群资源管理器的组件叫做YARN,分布式文件系统叫HDFS。
无论Hadoop如何将代码在数据上执行移动运算,Storm做的是将数据指向代码。这在流数据处理的系统中显得更为合理,
Apache Spark
和Hadoop的MapReduce类似,Spark是一个类似的批处理工具,也能运行在Hadoop的YARN资源管理器之上。有意思的一点就是,Spark允许你在中间或是结尾处,将数据缓存至内存中(有必要也可以将输出保存到磁盘上)。这种特性最有价值的一点,就是特别适用于在一个相同的数据集上反复执行运算,并且能将上一次运算按照一定算法保留,作为下一次运算的输入。
Spark Streaming和Storm类似,Spark Streaming用于处理无边界的流数据,但不同点在于,SparkStreaming不会将数据按照类别导入到流处理工具中,取而代之的是将其导入到微型批处理工具中。Spark Streaming是建立在Spark之上的,它需要将输入的流数据标记成一个个数据批次,以便执行操作。
Apache Samza
Samza是一个新兴的流数据处理系统,是由LinkedIn(领英)团队打造,效果完全和Storm不相上下。但你依然会发现一些区别,这里无论是Storm还是Spark或者Spark Streaming,它们都运行在基于YARN的资源管理器上,而Samza则是与YARN系统分开独立运行的。
1.3 为什么你希望使用Storm
[插图]它可以广泛用于各类用户场景中。
[插图]它可以和不同技术协同工作。
[插图]它具备可扩展性,Storm可以轻松将工作分解至不同线程上,并分派至不同JVM(Java虚拟机)上,甚至是不同的物理机上,而这些还不需要在你的代码上做任何调整(只需要修改配置就可以了)。
[插图]它可以确保每个输入的数据至少会被处理一次。
[插图]它相当健壮,你也可以称之为高容错性。Storm中有四个主要的组件,在大部分时间里,摧毁任何一个组件都不会中断数据的处理。
[插图]它与使用的编程语言无关,如果你的程序能在JVM上执行,它就可以在Storm上轻松执行。即使没法在JVM上执行,如果你能在一个*nix命令行中调用它,它也可以在Storm上正常运行(尽管在本书中,我们将限定于使用JVM和Java)。
第2章 Storm核心概念
难理解执行器(executor)和任务(task)
2.1 计算github提交监控看盘
2.2 Storm基础概念
2.2.1 拓扑图
2.2.2 元组
元组(tuple)是拓扑中结点之间传输数据的形式,它本身是一个有序的数值序列,其中每个数值都会被赋予一个命名。一个结点可以创建元组,然后发送(可选)至任意其他结点,这个发送元组到任意结点的过程,称作发射(emit)一个元组。
2.2.3 流
根据Storm维基中的描述,一个流是一个“无边界的元组序列”,这是关于流最恰当的解释。在拓扑中,一个流是拓扑中两个结点间一个无边界的元组序列。
2.2.4 spout
一个spout是拓扑的流数据源头,spout通常会从外部数据源读取数据并且向拓扑中发射元组,它可以实现监听包括消息队列、数据库或者任何其他数据输入源。在我们的例子中,spout监听的是GitHub中代码仓库的提交消息,并将这些实时数据灌入Storm拓扑中。
2.2.5 bolt
不同于spout只负责监听数据源,bolt可以完成从输入流的元组接收,对元组进行计算或转换操作,如过滤、聚合和连接等,以及可能会发射新的元组形成输出流。
bolt和spout是如何工作的如图2.9和图2.10所示,spout和bolt都显示为一个独立的组件,仅从逻辑视角看这是没问题的,但当讨论到它们是如何工作的时候,就需要更深入地理解和认识了。在一个运行的拓扑中,通常有大量的bolt和spout实例并行运行。
2.3 在Storm中实现GitHub提交数监控看板
完整代码见:https://github.com/mfilipelino/storm-notes
2.3.2 实现spout
class CommitFeedListener extends BaseRichSpout
private SpoutOutputCollector outputCollector;//发射元组
private List<String> commits;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("commit"));
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
this.outputCollector = collector;
commits = FakeData.changeLog();
@Override
public void nextTuple()
for (String commit : commits)
outputCollector.emit(new Values(commit));
2.3.3 实现bolt
class EmailExtractor extends BaseBasicBolt
@Override
public void execute(Tuple input, BasicOutputCollector collector)
String commit = input.getStringByField("commit");
String[] parts = commit.split(" ");
collector.emit(new Values(parts[1]));
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("email"));
class EmailCounter extends BaseRichBolt
private Map<String, Integer> counts;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
initMap();
private void initMap()
this.counts = new HashMap < String, Integer > ();
@Override
public void execute(Tuple input)
String email = input.getStringByField("email");
counts.put(email, countFor(email) + 1);
printCounts();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
// nada
private Integer countFor(String email)
Integer count = counts.get(email);
return count == null ? 0 : count;
private void printCounts()
for (String email: counts.keySet())
System.out.println(String.format("%s has count of %s", email, counts.get(email)));
class EmailCounter extends BaseRichBolt
private Map<String, Integer> counts;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
initMap();
private void initMap()
this.counts = new HashMap < String, Integer > ();
@Override
public void execute(Tuple input)
String email = input.getStringByField("email");
counts.put(email, countFor(email) + 1);
printCounts();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
// nada
private Integer countFor(String email)
Integer count = counts.get(email);
return count == null ? 0 : count;
private void printCounts()
for (String email: counts.keySet())
System.out.println(String.format("%s has count of %s", email, counts.get(email)));
2.3.4 集成各个部分组成拓扑
我们的spout和bolt代码单独来看是无法运行的,需要先构建拓扑,并定义流和spout以及bolt之间的流分组策略。在此之后,我们就可以运行一个测试来判断拓扑是否能正常工作。Storm提供了你所需要的所有类,如下所示:
[插图]TopologyBuilder,这个类用来将spout和bolt代码片段合并在一起,并定义流和流分组策略。
[插图]Config,这个类用来定义拓扑层的配置。
[插图]StormTopology,这个类是由TopologyBuilder构建出来的,并且会被提交到集群上运行。
[插图]LocalCluster,这个类将在本地模拟一个Storm集群,使得我们可以轻松实现拓扑的运行测试。理解了这些类,我们接下来就要构建拓扑,并提交到本地集群进行测试,如代码清单如下所示。
public class LocalTopologyRunner
private static final int TEN_MINUTES = 600000;
public static void main(String[] args)
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("commit-feed-listener", new CommitFeedListener());
builder.setBolt("email-extractor", new EmailExtractor())
.shuffleGrouping("commit-feed-listener");
builder.setBolt("email-counter", new EmailCounter())
.fieldsGrouping("email-extractor", new Fields("email"));
Config config = new Config();
config.setDebug(true);
StormTopology topology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(
"github-commit-topology",
config,
topology
);
Utils.sleep(TEN_MINUTES);
cluster.killTopology("github-commit-topology");
cluster.shutdown();
第3章 拓扑设计
3.1 拓扑设计方法
拓扑图设计的五个步骤:
1.定义问题/构造一个概念上的解决方案。
2. 将解决方案映射到Storm中。
3.实现初始方案。在这一步中,每个相关组件都将被实现并完成部署。
4.扩展拓扑。
5.一边观察一边优化。
3.2 问题定义:一个社交热力图
构建概念性解决方案
3.3 将解决方案映射至Storm的逻辑
最好的方法是先考虑流经系统的数据的特性,当我们对数据流所包含的特性有足够理解后,对需求的理解也会更清晰,明白接下来应该如何在系统上建立实施。
参考:
问题
YARN系统是什么?
https://baike.baidu.com/item/yarn/16075826?fr=aladdin
以上是关于storm应用实践:实时事务处理《读书笔记》的主要内容,如果未能解决你的问题,请参考以下文章
应用案例 | 从Storm到Flink,有赞五年实时计算效率提升实践