大数据Flink进阶:Flink集群部署
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据Flink进阶:Flink集群部署相关的知识,希望对你有一定的参考价值。
文章目录
Flink的安装和部署主要分为本地(单机)模式和集群模式,其中本地模式只需直接解压就可以使用,不用修改任何参数,一般在做一些简单测试的时候使用。本地模式在这里不再赘述。集群部署模式主要包含Standalone、Hadoop Yarn 、Kubernetes等,Flink可以借助以上资源管理器来实现分布式计算,目前企业使用最多的是Flink 基于Hadoop Yarn资源管理器模式,下面我们重点讲解Flink 基于Standalone集群、Yarn资源管理器以及Kubernetes集群部署方式。
一、Standalone集群部署
1、节点划分
通过Flink运行时架构小结,我们知道Flink集群是由一个JobManager(Master)节点和多个TaskManager(Worker)节点构成,并且有对应提交任务的客户端。这里部署Standalone集群基于Linux Centos7.6版本,选择4台节点进行部署Flink,其中3台节点Standalone集群节点、一台节点是提交Flink任务的客户端,各个节点需要满足以下特点:
- 各节点安装java8版本及以上jdk(这里选择jdk8)。
- 各个节点之间需要两两免密。
4台节点角色划分如下:
节点IP | 节点名称 | Flink服务 |
---|---|---|
192.168.179.4 | node1 | JobManager,TaskManager |
192.168.179.5 | node2 | TaskManager |
192.168.179.6 | node3 | TaskManager |
192.168.179.7 | node4 | client |
2、standalone集群部署
我们可以从Flink的官网下载Flink最新的安装包,这里选择Flink1.16.0版本,Flink安装包下载地址:https://flink.apache.org/downloads.html#apache-flink-1160。Standalone集群部署步骤如下:
上传压缩包解压
将Flink的安装包上传到node1节点/software下并解压:
[root@node1 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz
配置Master节点
配置Master节点就是配置JobManager节点,在$FLINK_HOME/conf/masters文件中配置jobManager节点如下:
#vim $FLINK_HOME/conf/masters
node1:8081
配置Worker节点
配置Worker节点就是配置TaskManager节点,在$FLINK_HOME/conf/workers文件中配置taskManager节点如下:
#vim $FLINK_HOME/conf/workers
node1
node2
node3
配置 flink-conf.yaml 文件
在node1节点上进入到FLINK_HOME/conf目录下,配置flink−conf.yaml文件(vimFLINK_HOME/conf/flink-conf.yaml配置如下内容),内容如下:
# JobManager地址
jobmanager.rpc.address: node1
# JobManager地址绑定设置
jobmanager.bind-host: 0.0.0.0
# TaskManager地址绑定设置
taskmanager.bind-host: 0.0.0.0
# TaskManager地址(不同TaskManager节点host配置对应的host)
taskmanager.host: node1
# 设置每个TaskManager 的slot个数
taskmanager.numberOfTaskSlots: 3
# WEB UI 节点(只需JobManager节点设置,TaskManager节点设置了也无所谓)
rest.address: node1
# WEB UI节点绑定设置(只需JobManster节点设置)
rest.bind-address: 0.0.0.0
注意:以上设置的0.0.0.0代表监听当前节点每一个可用的网络接口,0.0.0.0不再是一个真正意义上的ip地址,而表示一个集合,监听0.0.0.0的端口相当于是可以监听本机中的所有ip端口。以上配置的0.0.0.0 表示想要让外部访问需要设置具体ip,或者直接设置为"0.0.0.0"。
分发安装包并配置 node2 、 node3 节点 flink-conf.yaml 文件
#分发到node2、node3节点上
[root@node1 ~]# scp -r /software/flink-1.16.0 node2:/software/
[root@node1 ~]# scp -r /software/flink-1.16.0 node3:/software/
#修改node2、node3 节点flink-conf.yaml文件中的TaskManager
【node2节点】 taskmanager.host: node2
【node3节点】 taskmanager.host: node3
#注意,这里发送到node4,node4只是客户端
[root@node1 ~]# scp -r /software/flink-1.16.0 node4:/software/
启动Flink 集群
#在node1节点中,启动Flink集群
[root@node1 ~]# cd /software/flink-1.16.0/bin/
[root@node1 bin]# ./start-cluster.sh
访问Flink WebUI
https://node1:8081,进入页面如下:
3、任务提交测试
Standalone集群搭建完成后,可以将Flink任务提交到Flink Standalone集群中运行。有两种方式提交Flink任务,一种是在WebUI界面上提交Flink任务,一种方式是通过命令行方式。
这里编写读取Socket数据进行实时WordCount统计Flink任务提交到Flink集群中运行,这里以Flink Java代码为例来实现,代码如下:
/**
* 读取Socket数据进行实时WordCount统计
*/
public class SocketWordCount
public static void main(String[] args) throws Exception
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.读取Socket数据
DataStreamSource<String> ds = env.socketTextStream("node5", 9999);
//3.准备K,V格式数据
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) ->
String[] words = line.split(",");
for (String word : words)
out.collect(Tuple2.of(word, 1));
).returns(Types.TUPLE(Types.STRING, Types.INT));
//4.聚合打印结果
tupleDS.keyBy(tp -> tp.f0).sum(1).print();
//5.execute触发执行
env.execute();
以上代码编写完成后,在对应的项目Maven pom 文件中加入以下plugin:
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<!-- 设置false后是去掉 xxx-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>xx.xx.xx</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
然后使用Maven assembly 插件对项目进行打包,得到"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"完整jar包。
此外,代码中读取的是node5节点scoket 9999端口数据,需要在node5节点上安装nc组件:
[root@node5 ~]# yum -y install nc
- 命令行提交Flink任务
在node1 上启动 Flink Standalone 集群
[root@node1 bin]# cd /software/flink-1.16.0/bin/
[root@node1 bin]# ./start-cluster.sh
在node5 节点上启动 nc socket 服务
[root@node5 ~]# nc -lk 9999
将打好的包提交到Flink 客户端 node4 节点 /root 目录下并提交任务
[root@node4 ~]# cd /software/flink-1.16.0/bin/
#向Flink集群中提交任务
[root@node4 bin]# ./flink run -m node1:8081 -c com.mashibing.flinkjava.code.lesson03.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
进入Flink WebUI 界面查看任务和结果
#向node5 socket 9999 端口写入以下数据
hello,a
hello,b
hello,c
hello,a
WebUI查看对应任务和结果
登录Flink WebUI http://node1:8081查看对应任务执行情况。
WebUI查看执行结果:
在WebUI中点击对应的任务Job,进入如下页面点击"Cancel Job"取消任务执行:
- Web界面提交 Flink 任务
向Flink集群提交任务还可以通过WebUI方式提交。点击上传jar包,进行参数配置,并提交任务。
提交任务之后,可以通过WebUI页面查看提交任务,输入数据之后可以在对应的TaskManager节点上看到相应结果。
二、Flink On Yarn
Flink可以基于Yarn来运行任务,Yarn作为资源提供方,可以根据Flink任务资源需求动态的启动TaskManager来提供资源。Flink基于Yarn提交任务通常叫做Flink On Yarn,Yarn资源调度框架运行需要有Hadoop集群,Hadoop版本最低是2.8.5。
1、Flink不同版本与Hadoop整合
Flink基于Yarn提交任务时,需要Flink与Hadoop进行整合。Flink1.8版本之前,Flink与Hadoop整合是通过Flink官方提供的基于对应hadoop版本编译的安装包来实现,例如:flink-1.7.2-bin-hadoop24-scala_2.11.tgz,在Flink1.8版本后不再支持基于不同Hadoop版本的编译安装包,Flink与Hadoop进行整合时,需要在官网中下载对应的Hadoop版本的"flink-shaded-hadoop-2-uber-x.x.x-x.x.jar"jar包,然后后上传到提交Flink任务的客户端对应的$FLINK_HOME/lib中完成Flink与Hadoop的整合。
在Flink1.11版本之后不再提供任何更新的flink-shaded-hadoop-x jars,Flink与Hadoop整合统一使用基于Hadoop2.8.5编译的Flink安装包,支持与Hadoop2.8.5及以上Hadoop版本(包括Hadoop3.x)整合。在Flink1.11版本后与Hadoop整合时还需要配置HADOOP_CLASSPATH环境变量来完成对Hadoop的支持。
2、Flink on Yarn 配置及环境准备
Flink 基于Yarn提交任务,向Yarn集群中提交Flink任务的客户端需要满足以下两点
- 客户端安装了Hadoop2.8.5+版本的hadoop。
- 客户端配置了HADOOP_CLASSPATH环境变量。
这里选择node5节点作为提交Flink的客户端,该节点已经安装了Hadoop3.3.4版本,然后在该节点中配置profile文件,加入以下环境变量:
# vim /etc/profile,加入以下配置
export HADOOP_CLASSPATH=`hadoop classpath`
#source /etc/profile 使环境变量生效
[root@node5 ~]# source /etc/profile
然后将Flink的安装包上传到node5节点/software下并解压:
[root@node5 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz
3、任务提交测试
基于Yarn运行Flink任务只能通过命令行方式进行任务提交,Flink任务基于Yarn运行时有几种任务提交部署模式(后续章节会进行介绍),下面以Application模式来提交任务。步骤如下:
- 启动HDFS集群
#在 node3、node4、node5节点启动zookeeper
[root@node3 ~]# zkServer.sh start
[root@node4 ~]# zkServer.sh start
[root@node5 ~]# zkServer.sh start
#在node1启动HDFS集群
[root@node1 ~]# start-all.sh
- 将 Flink 任务对应的 jar 包上传到 node5 节点
这里的Flink任务还是以读取Socket数据做实时WordCount任务为例,将打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包上传到node5节点的/root/目录下。
- 在node5 节点执行如下命令运行 Flink 作业
[root@node5 ~]# cd /software/flink-1.16.0/bin/
# 提交Flink任务
[root@node5 bin]#./flink run-application -t yarn-application -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
- 查看WebUI及运行结果
Flink任务Application模式提交后,浏览器输入https://node1:8088登录Yarn WebUI,找到提交的任务,点击对应的Tracking UI"ApplicationMaster"进入到Flink WEBUI任务页面。
向node5 scoket 9999端口输入以下数据并在对应的WebUI中查看结果:
#向node5 socket 9999 端口写入以下数据
hello,a
hello,b
hello,c
hello,a
在WebUI中找到对应的Flink TaskManager节点 Stdout输出,结果如下:
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
大数据(9b)Flink集群部署
文章目录
1、下载地址
https://archive.apache.org/dist/flink/
2、解压
tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C $B_HOME/
cd $B_HOME
mv flink-1.10.1 flink
3、环境变量
https://yellow520.blog.csdn.net/article/details/112692486
export FLINK_HOME=$B_HOME/flink
4、打包上传一个Flink代码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WordCount {
public static void main(String[] args) throws Exception {
// 流执行环境
StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
// 从网络中读取数据
DataStream<String> inputDataStream = e.socketTextStream("localhost", 7777);
// 分词,平化,分组,合计
DataStream<Tuple2<String, Integer>> ds;
ds = inputDataStream.flatMap(new MyFlatMap()).keyBy(0).sum(1);
// 打印,设置并行度
ds.print();
// 执行
e.execute();
}
}
5、开启网络数据传输端口
nc -lk 7777
6、Flink的YARN模式
把Flink的Hadoop包放到lib
下
cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib
ll $FLINK_HOME/lib
启动Hadoop
hadoop.py start
6.1、会话模式(Session Mode)
在YARN中初始化一个Flink集群,开辟指定的资源,以后提交任务都向这里提交。
这个flink集群会常驻在YARN集群中
6.1.1、开启会话
$FLINK_HOME/bin/yarn-session.sh \\
-s 2 \\
-jm 1024 \\
-tm 1024 \\
-nm a1 \\
-d
参数 | 说明 |
---|---|
-s (--slots ) | 每个TaskManager的slot数量 |
-jm | JobManager的内存(单位MB) |
-tm | 每个TaskManager的内存(单位MB) |
-nm | YARN上应用程序名字 |
-d | 后台执行 |
6.1.2、运行jar包
$FLINK_HOME/bin/flink run -c WordCount FlinkPractise-1.0-SNAPSHOT.jar
结果
如图示5个单词,Records
也是5
6.1.3、关闭会话
yarn application --list
yarn application --kill application_1626235724262_0001
6.2、任务独立提交模式(Per-Job Cluster Mode)
每次提交都会创建一个新的Flink集群,任务之间互相独立
任务执行完成之后创建的集群也会消失
多了个-m yarn-cluster
$FLINK_HOME/bin/flink run \\
-m yarn-cluster \\
-c WordCount \\
FlinkPractise-1.0-SNAPSHOT.jar
7、Appendix
en | 🔉 | cn |
---|---|---|
session | ˈseʃn | n. 会议;(法庭的)开庭;(议会等的)开会;学期;会话 |
slot | slɒt | n. 位置;狭槽;水沟;硬币投币口;vt. 跟踪 |
slots | slɒts | n. 插槽 |
以上是关于大数据Flink进阶:Flink集群部署的主要内容,如果未能解决你的问题,请参考以下文章
[3] Flink大数据流式处理利剑: Flink的部署架构