Flink流计算开发
Posted JAVA狐
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink流计算开发相关的知识,希望对你有一定的参考价值。
本文以一个来自官网的wiki-edit例子为引,借此了解 Flink 流计算开发模型、理解相关概念、理解Flink逻辑。
Flink是大数据处理领域最近很火的一个开源的分布式、高性能的流式处理框架,其对数据的处理可以达到毫秒级别。
1、
2、开发环境准备(均做了高可用可实现故障自动转移)
在开发环境启动下面的 3 个集群:
启动 Hadoop 集群(5 台)
启动 Flink 集群(3 台)
启动 kafka 集群(3 台
3、Flink 开发准备
3.1)IntelliJ IDEA(比 eclipse 好用)
3.2)创建 Maven 工程
我们将使用一个 Flink Maven 原型来创建工程结构,可以运行下面的命令来创建 Maven 工程:
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
-DarchetypeVersion=1.3-SNAPSHOT
-DgroupId=wiki-edits
-DartifactId=wiki-edits
-Dversion=0.1
-Dpackage=wikiedits
-DinteractiveMode=false
我这里是(在 cmd 中执行):
mvn archetype:generate ^
-DarchetypeGroupId=org.apache.flink ^
-DarchetypeArtifactId=flink-quickstart-java ^
-DarchetypeCatalog=http://localhost:8081/nexus/content/groups/public/ ^
-DarchetypeVersion=1.0.0 ^
-DgroupId=wiki-edits ^
-DartifactId=wiki-edits ^
-Dversion=0.1 ^
-Dpackage=wikiedits ^
-DinteractiveMode=false
你可以根据需要修改 groupId,artifactId 和 package 参数。使用上面的命令和参数,Maven 将会创建出一个工程框架,其结构如下所示:
wiki-edits 项目结构:
wiki-edits/
├── pom.xml
└── src
└── main
├── java
│ └── wikiedits
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.properties
删除 wiki-edits/src/main/java/wikiedits/*.java
把项目wiki-edits导入IntelliJ IDEA
3.3)添加 Flink Wikipedia 连接器依赖
和 Spark 一样,Flink 内置提供了读/写 Kafka Topic 的 Kafka 连接器(Kafka Connectors)。Flink Kafka Consumer 和 Flink 的 Checkpint 机制进行了整合,以此提供了 exactly-once 处理语义。为了实现这个语义,Flink 不仅仅依赖于追踪 Kafka 的消费者 group 偏移量,而且将这些偏移量存储在其内部用于追踪。
和 Spark 一样,Flink 和 Kafka 整合的相关 API 也没有打包进 Flink 包中,而是单独进行了打包;所以如果我们需要在 Flink 中使用到 Kafka,需要将这个包引入到我们的 pom.xml 文件中。
编辑 pom.xml 文件的 dependencies 部分,修改完成后应该像下面一样:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
我们可以看到 pom.xml 文件中已经加入了 flink-connector-wikiedits_2.11 依赖。
4、编写 Flink 程序,实现计算结果写入 Kafka
程序实现了:流式计算每个用户在一个特定时间窗口内(比如说 5 秒钟)增加或者删除内容的字节数。
package wikiedits;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
//FLINK 流计算拓扑任务代码分析
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
//首先,构建一个 StreamExecutionEnvironment
//用来设置运行参数
//当从外部系统读取数据的时候,也被用来创建源(sources)
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
//读取 Wikipedia IRC 日志的源(sources)
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
//在本案例中,我们关心的是每个用户在一个特定时间窗口内(比如说 5 秒钟)增加或者删除内容的字节数。
//为了实现这个目标,我们需要指定用户名作为数据流的 key 字段,也就是说在这个数据流上的操作应该考虑到用户名。
//在我们的案例中需要对时间窗口中每个唯一用户的编辑字节数求和。为了使数据流包含 key,我们需要提供一个 KeySelector.ng
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
//它创建了一个 WikipediaEditEvent 流,以用户名作为 String 类型的 key。
//现在我们可以在这个流上指定窗口并且基于这些窗口内的数据计算出结果。
//一个窗口指定了要执行计算的数据流的一个分片。当需要在一个无边界的数据流上执行聚合计算时,窗口是必不可少的。
//在我们的案例中,我们想要做的就是每 5 秒钟一个窗口对编辑字节数做聚合计算
DataStream<Tuple2<String, Long>> result = keyedEdits
//指定了我们需要一个大小为 5 秒钟的滚动窗口(非重叠窗口)
.timeWindow(Time.seconds(5))
//调用的第二个方法指定了对每个窗口分片中每个唯一的 key 做 Fold transformation 转换。
.fold(
new Tuple2<>("", 0L), //初始值
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
//将结果流在终端输出
//result.print();
//将结果输出到 Kafka
result.
map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> tuple) {
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer09<>(
"node1.bddata.net:9092,node2.bddata.net:9092,node3.bddata.net:9092",
"wiki-result", new SimpleStringSchema()
));
//开始执行计算
//像前面的创建数据源,转换和 Sinks 操作仅仅是构建了一个内部操作图。
//只有当 execute()被调用的时候,这个操作图才会被扔在集群或者在你的本地机器运行。
see.execute();
}
}
5、程序打包并运行
你可以在你的 IDE 或者命令行下使用 Maven 打包。
$ mvn clean package
然后上传 jar 到服务器。
提交作业jar包到集群并运行:
$ bin/flink run -c wikiedits.WikipediaAnalysis wiki-edits-0.1.jar
作业运行情况:
作业运行分析:
执行的入口从用户程序的 execute()函数入手,execute()的源码如下:
public JobExecutionResult execute(String jobName) throws Exception {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
. . . . . . .
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
try {
exec.start();
return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
}
finally {
transformations.clear();
exec.stop();
}
}
函数内部主要有 getStreamGraph()、getJobGraph()、exec.start()、exec.submitJobAndWait()等。getStreamGraph()的作用是生成 StreamGraph 图,getJobGraph()的作用是生成 JobGraph 的图,exec.start()的作用是建立 Client、JobManager、TaskManager 三者之间通信初始化,exec.submitJobAndWait()的作用提交 job 并且等待 job 执行后的结果,该函数提供了任务执行调度执行的入口,进入 Client 类中,首先执行 createUserCodeClassLoader()函数,创建用户代码的加载器,然后执行 jobClient.SubmitJobAndWait(),进入 JobClient 类,在函数内部会执行 submit 函数,从该函数开始进入 AKKA 通信阶段,首先会进入 JobClientActor,会创建一个 jobclientActor 来对 JobManager 和 client 进行通信,当通信对象创建之后,会执行 akka 机制的 ask 函数,该函数的作用是发出一个消息,然后要求收到方给予回复。当消息发出之后,OnReceive()函数会收到 actor 发出的消息请求,然后调用 handleMessage()方法来处理消息请求,该函数内部有 connectToJobManager()方法,此方法内部的 tryToSubmitJob()函数是正式提交任务的操作,主要做的工作就是 uploadUserJars()上传用户程序的 jar 文件,接着会 jobManager.tell()向 JobManager 发出一个 submit 消息请求。
当 JobManager 收到 Client 发送的消息之后,会执行 JobManager 内部的 submitJob 方法:
case SubmitJob(jobGraph, listeningBehaviour) =>
val client = sender()
val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
jobGraph.getSessionTimeout)
log.info("liuzf---开始执行 JobManager 的 submitJob()")
submitJob(jobGraph, jobInfo)
首先会把由 client 收到的 job 信息封装在 jobinfo 中,然后把 jobinfo 以及 job 的任务图 jobGraph 一起发送给 submit()去执行,在 JobManager 的 submit 函数中处理的函数逻辑比较复杂,比较重要的函数执行过程如下:
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
try {
libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
jobGraph.getClasspaths)
}
val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
}
executionGraph = ExecutionGraphBuilder.buildGraph()
try {
submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
jobInfo.notifyClients(
decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
log.info(s"开始调度 job $jobId ($jobName).")
executionGraph.scheduleForExecution()
首先执行 libraryCacheManager.registerJob(),向 CacheManager 进行注册,请求缓存,然后执行 getClassLoader()来加载用户的代码加载器,接下来会调用 ExecutionGraph 中的 buildGraph()构造 ExecutionGraph 的并行化版本的执行图,当逻辑执行图构造完毕之后,这时候可以通知 Client 任务已经成功提交,并且提交过程结束。接下来会调用 sheduleForExecution()来会整体的资源进行调度分配,主要是每个 TaskManager 中的 slot 的分配,并且当 slot 分配完成之后,所有的 task 的任务状态发生改变,由 CREATEDàSCHEDULED。接下分配完之后,接下来执行 depolyToSlot()函数,就要进入部署状态,同样会执行 transitionState()函数,将 SCHEDULED 状态变为 DEPOLYING 状态,接着的重要函数是 shumitTask()函数,该函数会通过 AKKA 机制,向 TaskManager 发出一个 submitTask 的消息请求,TaskManager 收到消息请求后,会执行 submitTask()方法,该函数的重要执行过程如下:
public submitTask(){
val task = new Task(. . . .)
log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}")
val execId = tdd.getExecutionAttemptId
val prevTask = runningTasks.put(execId, task)
if (prevTask != null) {
runningTasks.put(execId, prevTask)
throw new IllegalStateException("TaskM}anager already contains a task for id " + execId)
}
task.startTaskThread()
sender ! decorateMessage(Acknowledge.get())
}
首先执行 Task 的构造函数,生成具体物理执行的相关组件,比如 ResultPartition 等,最后创建执行 Task 的线程,然后调用 startTaskThread()来启动具体的执行线程,Task 线程内部的 run()方法承载了被执行的核心逻辑,该方法具体的内容为:
public void run() {
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
break;
}
}
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
network.registerTask(this);
Environment env = new RuntimeEnvironment(. . . . );
invokable.setEnvironment(env);
// ----------------------------------------------------------------
// actual task core work
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
notifyObservers(ExecutionState.FINISHED, null);
}
Finally{
// free the network resources
network.unregisterTask(this);
// free memory resources
if (invokable != null) {
memoryManager.releaseAll(invokable);
}
libraryCache.unregisterTask(jobId, executionId);
removeCachedFiles(distributedCacheEntries, fileCache);
首先执行 transitionState()函数将 TaskManager 的状态由 CREATED 转变为 DEPOLYING 状态,然后调用 loadAndTrantiateInvokable()对用户代码打包成 jar 包,并且生成用户代码加载器,然后执行 network.registerTask(),执行该函数之前,会执行 NetworkEnvironment 的构造函数,该类是 TaskManager 通信的主对象,主要用于跟踪中间结果并负责所有的数据交换,在该类中会创建协助通信的关键部件,比如网络缓冲池,连接管理器,结果分区管理器,结果分区可消费通知器等。当网络对象准备完成后,创建一个运行环境,然后执行 invoke.setEnvironment(env),将各种配置打包到运行环境中。
当运行环境准备之后,接下来到了具体分析任务执行的时候,首先会调用 transitionState()函数将任务状态由 DEPOLYING 改为 RUNNING 状态,然后会调用 notifyObservers()通知所有的 task 观察者也改变状态,然后执行 setContextClassLoader()将执行的类加载器设置为用户执行的加载器,然后执行 invokable.invoke(),该函数是分界点,执行前用户逻辑没有被触发,执行之后说明用户逻辑已完成。当执行完成之后,调用 transitionState()函数执行的 RUNNING 状态改成 FINISHED 状态。同样调用 notifyObservers()来通知其他观察者改变状态,最后,释放资源。
6、总结
Flink特点:
支持高吞吐、低延迟、高性能的流处理
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
支持基于轻量级分布式快照(Snapshot)实现的容错
一个运行时同时支持Batch on Streaming处理和Streaming处理
Flink在JVM内部实现了自己的内存管理
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
支持迭代计算
Storm、Spark、Flink三者区别:Spark是准实时的流式计算,Storm是真正的流式计算,Flink是更快的真正的流式计算,Spark streaming是更快的批处理,而Flink Batch是有限数据的流式计算。Flink适用于实时流式数据处理、离线数据处理、DataLake计算等场景。
数据流:
App、微信、H5、PC等应用上的埋点数据 --> Flume --> Kafka --> Flink --> Kafka/KV缓存/分布式存储 --> 数据应用(推荐、看板、机器学习...)
Binlog日志 --> 解析系统 --> Kafka --> Flink --> Kafka/KV缓存/分布式存储 --> 数据应用
7、常见问题处理方式
1). 当 Source 是 Kafka 的时候,如何设置 Source Operator 的并发度?
如果没有指定,Source Operator 的个数与集群中的 TaskManager 的个数相等。如果手动设置,建议使用的 slot 个数=Kafka Partition 的个数/TaskManager 的个数。此时,Slot 的个数需大于等于 2。因为其中有一个 Source Operator。也不建议在一个 Slot 中启用多线程。
2). Barrier 如果丢失了怎么办?
因为 Barrier 是从 Source 开始周期性的发送的,所以过一段时间未被标记为阻塞的 input channel 会收到下一个 checkpoint 的 barrier,这时 Flink 会进行比对,发现如果当前的检查点没有完成,但下一个 checkpoint 已经过来了,那么 Flink 会放弃当前的 checkpoint,转而使用下一个 checkpoint。
3). 在 Flink UI 上 Cancel Job,Job 所有的任务都会停止吗?
答:不是。Cancel 按钮只是把 Source,Transform 和 Sink 这些 Operator 停掉,对应的线程停掉。但整个 TaskManager 还在。所以,如果 Job 中如果有不在 Operator 中初始化 Spring 容器,那么即便 Cancel Job 以后,这些对象依然存在。所以,正确的姿势是在 Operator 的 open()方法中初始化 Spring 容器。在 close()方法中释放这些资源。
4). Job 运行过程中 TaskManager 挂了怎么办?
如果 TaskManager 挂了,Flink 会先将 Job cancel 掉。然后再以相同的 JobID,往集群中仍然存活的 TaskManager 上部署 Job,这时候,如果还有足够的 task slot,则 Job 能够恢复。但是这时候会有一个问题:部署在某些 TaskManager 上的 Task 数会比之前多,造成了这些 TaskManager 的负载较重,可能还是会出现问题。这时候就需要尽快恢复挂掉的 TaskManager。
5). 某条数据在 Input Channel 之间传输失败了怎么办?
会抛出 Exception,然后 Job 会重启。
6). Flink 读取 Kafka 时,Checkpoint 设置多久合适?
快照本身都是非常轻量级的,一般都在几 M 或者几十 M。如果快照过大,比如几百 M 甚至更多,就会对程序运行产生影响。官方给出的例子是几秒钟一次,具体可视 Job 情况决定。
7). Checkpoint 和 Savepoint 有什么区别?
savepoint 可以理解为是一种特殊的 checkpoint,savepoint 就是指向 checkpoint 的一个指针,需要手动触发,而且不会过期,不会被覆盖,除非手动删除。正常情况下的线上环境是不需要设置 savepoint 的。除非对 job 或集群做出重大改动的时候,需要进行测试运行。
8). Flink 的 Operator 不能带成员变量?
Flink operator function 中不能带没有实现 flink 序列化的成员变量。因为 flink 本身自己有一套序列化方式,在任务提交执行的时候会有 validation,如果把没有实现 flink 序列化的类作为成员变量,就会提交任务报错。目前的解决方案是将 operator function 与实际业务逻辑分离。或将成员变为 static。
9). 每个 TaskManager 设置多少个 TaskSlot 合适?
建议为 CPU 核数个。
10). TaskManager 中的 BufferPool 不够了咋办?
需要增大配置项:taskmanager.network.numberOfBuffers 的值,该值表示网络栈 buffer 的数量,它的大小表示在同一时刻该 TaskManager 能够拥有的流处理的数据交换的 channel 数。
11). Job 运行中出现了 OOM
说明保留的空间不够,这时需减少中间层的空间大小,通过配置降低 taskmanager.memory.fraction 的值来减少中间层的内存占比。该值表示 Flink 用于管理底层 buffer 所占用的内存比例。
12). Job 的并行度如何设置?
将所有的 transform operator 和 sink operator 的 parallism 设置成一样的,source operator 的 parallism 根据 source 而定。这样的话,flink 会自动把 transform operator 和 sink operator 都 merge 成一个 piple line 去运行。那么这时候一个 job 就变成只有两个 operator 了,source operator 和 merge 后的 operator,这个 pipeline operator 中间就没有 buffer 了,性能最优。
以上是关于Flink流计算开发的主要内容,如果未能解决你的问题,请参考以下文章