Flink流式计算从入门到实战 一
Posted roykingw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink流式计算从入门到实战 一相关的知识,希望对你有一定的参考价值。
文章目录
Flink流式计算实战专题一
==楼兰==一、理解Flink与流计算
1、初识Flink
Flink是Apache基金会的一个顶级项目,是目前业界公认最好的大数据的流式计算框架。
关于Flink,最早起源于Stratosphere项目。在2010-2014年期间,这是一个由德国柏林工业大学联合几所欧洲大学共同参与的研究性项目。2014年开始改名为Flink并捐献给了Apache基金会进行孵化,最终在2014年12月,Flink迅速成为了Apache软件基金会的顶级项目。并迅速超越了传统的Apache Storm,坐上了流式计算的头把交椅。
在Flink的发展过程中,阿里扮演了非常重要的角色。在2015年,阿里就开始参与了Flink的建设,并且建立了自己的一个分支版本Blink,在阿里体系内得到广泛的使用。到2019年,阿里将自己的Blink版本合并到了Flink的主分支内,为Flink做出了极大的贡献。
Flink的官网地址:https://flink.apache.org/。在官网上同时也提供了中文版的主页。对于Flink的介绍,非常的言简意赅“ Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。”。他的项目Logo也是一个非常具有Apache特色的小松鼠。
从官网介绍上可以看到,Flink擅长于处理无界流和有界流数据。
- 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
- 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。
这个解释比较官方,实际上对应我们生活中的案例,就可以更清晰的理解有界流和无界流。无界流也就对应我们通常所说的流式数据。流式数据最大的特点是无界和实时。流式计算通常只关注系统中当前传输的数据,这些数据都是实时动态产生的。流式计算一般用于进行实时计算。例如我们通常搭建一个网站,并不知道客户什么时候会发起访问,所以只能搭建一个后台服务,用户每发起一次请求,后台服务就处理一次请求,这就是一个典型的无界流场景。还有我们经常写的一些MQ产品的消费端程序,其实也是一个处理无界流的应用。
有界流通常也被称为批处理。批处理最大的特点是有界和持久。批处理需要关注的是系统中的全套数据,这些数据大部分都是静态的。批处理一般用于进行大型的离线计算。例如我们写的SQL语句,都是针对一个表中包含的大小固定的数据进行处理。只不过在Flink中,将这些批量数据也当成是一种流来看待,因此才有了有界流这样一个概念。
2、Flink的适用场景
实际上,流式处理和批处理就是我们平常处理数据的常用方式。那为什么还要产生Flink这样专门的计算框架呢?在这里,我们还需要给Flink的适用场景加上一个特点,大,也就是数据量非常大。
在面临海量数据处理时,现有的很多开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们要实现的目标是完全不相同的。对于流处理,最需要关注的是低延迟和Exactly-once保证。而对于批处理,更为关注的是高吞吐、高效处理。所以在实现时通常会给出两套不同的实现方法。例如MapReduce就是专门进行批处理,而Storm专注于处理流式数据。而Flink则是一个真正意义上的流批统一的计算框架。
Flink在实现流处理和批处理时,采用了一些与传统计算不一样的方案,他从另一个视角来看点流处理和批处理,将二者统一到一起进行处理。Flink是完全支持流处理,也就是说作为流处理看待时,输入的数据流是无界的。而批处理则被作为一种特殊的流处理,只是他的输入数据流被定义为有界的。基于同一个Flink运行环境,分别提供了流处理和批处理API,而这两种API也实现上层面向流处理、批处理类型应用框架的基础。
典型的一些应用场景包括实时监控系统、推荐系统、日志分析系统等。
3、流式计算梳理
Flink虽然实现了流批统一的计算模式,但是本质上来说,他还是更偏向于流式计算。所以,在开始Flink学习之前,我们有必要来理解一下,一个典型的流式计算框架是什么样的,需要处理哪些问题。
实际上,在处理源源不断的流式数据时,通常会有两种不同的思路。
一种思路是典型的来一条数据处理一条数据。
例如我们针对kafka,写一个消费者应用。这个应用就会一直挂起,kafka中进来一条消息,就会处理一条消息。但是这样的话,消费端的处理能力始终是有限的。当消息越来越多时,消费端的处理能力不足,就可能会产生消息积累,从而影响到消息处理的及时性。所以,我们通常写的kafka消费者应用,还只能成为一种应用,而不能称为一种框架。而Kafka针对大量消息提供的Kafka Streams就是一个最为典型的流式计算框架。
一个典型的Kafka Streams应用是这样的:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
这是kafka官方的一个示例,实现的就是最为常见的wordcount的逻辑。他的思路是通过StreamsBuilder提前构建好一个数据处理链,在这个简单示例中,textLines这一个KStream处理节点,这个节点可以输出到kafka的一个Topic中,也可以输入到下一个KStream中。每一个KStream可以对数据进行单独的处理,整体构成完整的数据处理链。当然,在这个简单示例中,就只有textLines这一个KStream。
应用启动后,就会逐一读取TextLinesTopic这个Topic中的每一条消息,对每条消息进行word count统计后,缓存到wordCounts这个中间结果集当中。最终将中间结果集转存到WordsWithCountsTopic这个topic中。而在消息处理的过程中,会提供一个名叫counts-store的KeyValueStore,这相当于是一个中间结果,通常称为状态。这个KeyValueStore是基于RocksDB提供的一个本地快速缓存。
实际上,Kafka Streams框架的功能相当丰富全面。例如这个简单的示例,是以kafka的Topic作为输入和输出源,也可以很方便的对接到其他的一些数据源,例如文本,或者ES等。关于这个框架的详细内容,我们会在另外的课程中单独讲解。在这里,我们需要理解的是这种非常具有代表性的流式计算处理方案。
这种方案会预先构建出数据处理的链条,这个链条是一个由多个处理步骤组成的复杂链条。对每一条数据的计算过程会分解到链条上的不同节点上,而不同节点的计算任务,可以通过技术优化进行并行的处理。这样就能极大的提高数据的处理速度。在构建处理链条时,还会提供一个快速的缓存服务,用于对中间的计算状态进行持久化。然后将这个处理链条对接到不同的输入源和输出源上,就可以形成一个完整的计算过程。
老牌的流式计算框架Storm其实也是采用的类似的方案。
第二种思路是将流式数据看成一个一个小的批量数据。
典型的像是在Spark Streaming,在处理流式数据时,是把流式数据划分成一个一个小的批量数据,称为窗口。例如这是spark官方给出的一个简单的Demo。
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
// 每秒创建一个窗口
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
//接收每一个窗口期的网络请求。
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
//进行流式的wordcount计算
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
这个代码大家应该非常熟悉了。在这个示例中,会将连续的网络请求参数切分成1秒一个的小窗口。每次处理数据时,也是对这1秒内的数据进行批量的处理。通过连续的批量处理完成整体的流式计算。这种方案可以很好的处理比如"最近五分钟内"这样的语义。
对这两种方案的比较很容易看出,第一种方案的时效性是比较高的,但是需要进行大量的并行计算,对计算资源的要求也会非常高。第二种方案,减少了并行计算,相对节省计算资源,但是时效性就没这么高。
这两种方案都是典型的大数据场景下的流式计算处理方案,Flink处理流式数据的方案也无出其右。我们可以先了解一下这两种方案,然后在后续学习Flink的过程中,逐步验证这些方案,理解Flink是如何在流式计算领域中傲视群雄的。
二、Flink安装部署
1、Flink的部署方式
Flink的部署方式非常多,Local、Standalone、Yarn、Mesos、Docker、Kubernetes、AWS都可以支持。其中,我们主要关注Local、Standalone、Yarn三种
LOCAL:不单独部署运行环境,在代码中直接调试。后续一些简单的代码样例就会以Local方式调试。
Standalone:独立运行环境。 这种模式下,Flink将自己完全管理运行资源。这种方式,其实资源利用率是比较低的。
Yarn:以Hadoop提供的Yarn作为资源管理服务。这样可以更高效的使用集群的机器资源。
接下来我们会带大家搭建Standalone和yarn两种集群,其他的几种部署方式也是为了兼容更多的资源管理服务,跟Yarn模式是很类似的,可以在用到的时候再去了解。
2、获取Flink
接下来就可以去Flink的官方下载页面获取Flink。下载地址 https://flink.apache.org/downloads.html。
当前最新版本是1.13.2,我们这次采用次新的1.12.5版本。选择Binaries下载运行时版本。在下载时会需要选择对应的scala版本。
Scala是基于JVM执行的一种语言,可以理解为跟Java很类似的。但是他的版本兼容性并没有java好,所以经常需要区分不同的版本。但是编译后都是提交到JVM上运行的。而我们下载的运行版本都是scala编译后的执行文件,所以跟scala语言没有太多关系。可以简单选择flink-1.12.5=bin-scala_2.12.tgz即可。
另外,如果需要使用hadoop的话,还需要回到flink的下载页面下载hadoop插件。
目前flink还尚未提供针对hadoop 3.x版本编译的插件包。但是根据官方的介绍,对于更新版本的Hadoop都是支持的。
3、实验环境与前置软件
实验环境:准备三台服务器,预装CentOS7操作系统,在/etc/hosts文件中分别配置机器名hadoop01,hadoop02,hadoop03。IP地址不重要,因为集群都会通过这个机器名来进行配置。三台服务器需要配置时间同步。
前置安装的软件:JDK8;Hadoop3.2.2版本(搭建yarn集群时需要)。Zookeeper采用3.5.8版本(Flink内置了zookeeper,在优化部署时通常还是采用外置的zookeeper)。
4、集群搭建
Flink的各种集群搭建方式基本上差不多。将Flink安装包上传到服务器hadoop01,并解压到/app/flink目录。可以看到,解压后的目录结构还是比较简单的。
lib目录下是flink的核心jar包,其中就包含了阿里贡献的blink的相关jar包。opt目录下是flink的一些扩展jar包。example下包含的是flink的示例。我们重点关注的是conf配置目录以及bin脚本目录。
在这里需要将之前下载的flink的hadoop插件包也一并上传到lib目录中。
1、接下来我们打开conf目录下的flink-conf.yaml。这个文件是flink的核心配置文件。 在文件对应的部分,修改如下配置:
# common
# 指定jobmanager地址
jobmanager.rpc.address: hadoop01
# High Avaibility
# 指定使用zookeeper进行高可用部署
high-availability: zookeeper
# 元数据存储地址。这个地址是需要所有节点都能访问到的一个地址。
high-availability.storageDir: hdfs://hadoop01:8020/flink/ha/
# 指定zookeeper节点。
high-availability.zookeeper.quorum: hadoop01:2181,hadoop02:2181,hadoop03:2181
这个配置文件中的配置项相对不是很多,并且注释也说得非常详细,根据注释能够很容易了解到配置项的意义。在上面给出的示例配置中,jobmanager.rpc.address必须指定自己的机器名。而Hign Avaibility部分是配置使用zookeeper搭建HA高可用集群的,如果没有安装hadoop和zookeeper,可以不用配置。
2、然后需要修改conf目录下的masters和workers文件,指定集群中的节点。
# masters 文件
hadoop01:8081
# workers 文件
hadoop02
hadoop03
3、接下来,就可以将整个flink目录分发到集群中另外两个节点上。
scp -r /app/flink/ root@hadoop02:/app/flink
scp -r /app/flink/ root@hadoop03:/app/flink
5、Standalone模式启动
配置完成后,可以在hadoop01上直接执行flink的bin目录下的start-cluster.sh脚本。
注意:在之前的实验中配置了高可用集群,使用zookeeper来协调,而文件配置到了hdfs上,所以需要先启动zookeeper和hadoop。
正常启动后,在hadoop01机器上使用jps指令可以看到一个StandaloneSessionClusterEntrypoint进程,而在hadoop02和hadoop03机器上,使用jps指令可以看到一个TaskManagerRunner指令。如果启动有问题,可以到flink的log目录下去查看启动日志。
启动完成后,可以查看flink提供的管理页面,地址: http://hadoop01:8081
在这个页面可以查看集群的相关状态。这里面比较关键的就是这个Available Task Slots。这个Slots插槽就是Flink执行具体任务的单元。具体的作用会在后面再讲解,但是在这里你只需要知道,如果slots不够,Flink就无法执行任务。
而最后的submit New Job页面,可以提交任务。现在,可以尝试下将flink的example下的jar包提交到集群上执行一下。例如可以用flink的example/streaming/WordCount.jar提交上去执行,就可以看到他的执行过程。
这里其实就详细描述了这个任务的执行过程以及所占用的slots资源等情况。当然现在看不懂没有关系,后面会做详细讲解。
然后任务执行的具体结果可以到Jobs -> Tasks Mansger菜单中查看。具体自己去点一点,感受一下就可以了。
这种本地部署的方式,资源利用率不够高效,通常不会用在生产环境中。更多的是作为开发调试集群。
Standalone模式的官方指导文件:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/ 里面介绍了命令行提交任务的方式。
6、Yarn模式提交任务
Apache Hadoop的Yarn是许多数据处理框架中非常流行的资源提供者。Flink的服务提交给Yarn的ResourceManager后,ResourceManager会在由Yarn的NodeManager管理的机器上动态分配运行容器。Flink在这些容器上部署自己的任务。
Yarn模式是将Flink交由Yarn来进行资源分配,因此,在启动Yarn模式时,需要保证集群上的Hadoop集群已经启动(包括HDFS和Yarn),HADOOP_HOME环境变量也已经正常配置。
6.1、首先在yarn上启动yarn-session。
你可以理解为是在yarn上运行的一个Flink集群。需要用到flink的bin目录下的yarn-session.sh脚本
bin/yarn-session.sh --detached
如果执行正常的话,你会在命令行看到如下的一些关键的日志:
JobManager Web Interface: http://hadoop01:35740
2021-08-30 17:48:01,186 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1630315011051_0003
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1630315011051_0003
Note that killing Flink might not clean up all job artifacts and temporary files.
这里面提供了如何关闭yarn-session的指令。在这个日志中也提到JobManager的管理页面。http://hadoop01:35740 这个管理页面跟之前Standalone搭建出来的是一样的,只是上面的一些数据会略有不同。例如首页上的Available Task Slots会是0。也就是说这些slot节点管理交由了yarn进行管理,而不再由Flink自己管理。
在Yarn的管理页面能够看到一个Flink的执行任务。
6.2、提交测试任务
进入flink的bin目录,使用下面的指令直接提交任务。这里提交的是flink提供的一个示例
.bin/flink run ../examples/streaming/WordCount.jar
任务提交后,可以在Yarn的控制台,以及Flink的控制台上跟踪任务执行的过程以及结果。当然,这个时候我们并不关心任务执行的具体情况,只是先体验一下。
另外,在Flink on Yarn中总共支持三种提交任务的方式,也需要了解下。主要是记住几个指令就可以了,以后开发出来的应用程序需要选择一个方式提交到Flink上运行。
6.3 Application Mode 应用模式
应用模式将在任务的启动时临时在yarn上申请一个flink集群。任务从main方法启动开始就会提交到yarn上的flink集群执行。执行完成后,集群就会立即注销。
# 提交任务,主要是记住这个-t 参数
./bin/flink run-application -t yarn-application ./examples/streaming/WordCount.jar
# 查看集群上的任务
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# 手动停止集群上的任务
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
# 使用所有节点都能访问到的jar包来提交任务。
./bin/flink run-application -t yarn-application \\
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \\
hdfs://myhdfs/jars/my-application.jar
另外,这些提交的任务,同样可以在flink的管理页面进行管理与监控。
6.4 Per-job Cluster Mode 单任务模式
这种模式跟应用模式很类似,也会给每个应用在yarn上申请一个单独的flink集群。只不过这种模式下,任务是先在本地执行,构建数据处理链。构建完成后再将任务提交到flink集群上执行。
# 提交任务
./bin/flink run -t yarn-per-job --detached ./examples/streaming/WordCount.jar
# 查询集群上正在执行的任务
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# 手动停止集群上的任务
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
在提交任务时,可以添加一个–detached参数,或者可以简化为 -d 。这个参数表示本地任务在被提交到flink集群后就主动停止了。这非常适合于一些长期执行的流式计算。
6.5 Session Mode 会话模式
剩下这种会话模式就相当于是一个共享的模式。在yarn上申请一个固定的flink集群,然后所有任务都共享这个集群内的资源。实际上,在这一章节最开始提交的任务就是这种Session会话模式。其实,从Yarn上注册的资源名字也能区分出来。
会话模式同样也有–datched 或者 -d 这样的模式。 默认情况下是attached mode 绑定模式。这种模式下客户端提交一个任务到flink集群后,客户端程序会继续执行。持续跟踪任务在集群中的运行状态。如果集群上任务执行失败了,本地客户端也会显示出这些错误。而如果本地客户端应用停止了,也同样会通知集群停止对应的任务。
加上–detached后,就转变为 detached mode 解除绑定模式。这种模式下本地客户端提交任务到集群后就停止了。任务在集群中的执行状态需要由yarn或者其他的管理工具进行监控。
# 提交任务
./bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY ./examples/streaming/WordCount.jar
# 重新绑定另外一个yarn session
./bin/yarn-session.sh -id application_XXXX_YY
# 查看帮助
./bin/yarn-session.sh -h
会话模式在执行时会在本地创建一个临时的配置文件,默认创建在/tmp目录下。
官方指导文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html 注意下版本。
最后注意,在生产环境中,一般Yarn上的资源都比较充足,优先建议使用Per-job模式,其次是Application模式。 这两种模式能够更好进行应用隔离。当然,如果集群的资源确实非常紧张,也可以使用Session模式。
以上是关于Flink流式计算从入门到实战 一的主要内容,如果未能解决你的问题,请参考以下文章