Flink的安装部署及WordCount测试
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的安装部署及WordCount测试相关的知识,希望对你有一定的参考价值。
一、本地模式
在本地以多线程的方式模拟Flink中的多个角色。(开发环境不用)
下载地址:https://flink.apache.org/downloads.html
这里下载的是:flink-1.13.0-bin-scala_2.12.tgz
上传到常用的位置,然后解压。
启动:
切换到flink的bin目录下,执行./start-cluster.sh,然后查看进程。
二、Standalone 独立集群模式
(如果先做了第一步,记得先停止服务,stop-cluster.sh)
- 上传、解压tar包。
- 修改配置文件
cd conf/flink-conf.yaml
指定Flink集群JobManager RPC通信地址。
(我这里的环境是三台机器,分别为master,slave1,slave2)
jobmanager.rpc.address: master
注意:flink-conf.yaml 中配置key/value 时候在“:”后面需要有一个空格,否则配置不会生效。
其他重要配置解释:
# jobmanager 和taskmanager 通信的端口号
jobmanager.rpc.port: 6123
# JobManager 的总进程内存大小
jobmanager.memory.process.size: 1600m
#当前taskmanager 整个进程占用的内存是多少
taskmanager.memory.process.size: 1728m
#每个taskmanager 可以提供的slots
taskmanager.numberOfTaskSlots: 1
#默认的并行度
parallelism.default: 1
#jobmanager 故障转移策略,1.9 之后出现的新属性,区域性恢复策略
jobmanager.execution.failover-strategy: region
#flink web 界面的端口号
rest.port: 8081
vi conf/workers
指定Flink 集群TaskManager。
slave1
slave2
- 发送到其他两个节点。
scp -r flink-1.13.0 slave1:$PWD
scp -r flink-1.13.0 slave2:$PWD
- 配置环境变量
vi /etc/profile
export FLINK_HOME=/usr/software/flink-1.13.0
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
-
启动集群
-
查看Flink节点进程
可以通过master:8081访问Flink的web界面
三、Standalone HA
从之前的架构中我们可以发现JobManager 有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦JobManager出现意外,其后果可想而知。
我们借助在Zookeeper 的帮助下,一个Standalone 的Flink 集群会同时有多个活着的JobManager,其中只有一个处于工作状态,其他处于Standby 状态。当工作中的JobManager 失去连接后(如宕机或Crash),Zookeeper 会从Standby中选一个新的JobManager 来接管Flink 集群。
-
集群规划
master:JobManager+TaskManager
slave1: JobManager+TaskManager
slave2: JobManager -
修改配置文件
vi masters
master:8081
slave1:8081
vi worksers
master
slave1
slave2
vi flink-conf.yaml
添加以下内容
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://master:9000/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://master:9000/flink/ha/
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181
- 同步flink配置文件目录conf 到其他两个节点上
scp -r conf slave1:$PWD
scp -r conf slave2:$PWD
-
修改slave1上的jobmanager通信地址
vi flink-conf.yaml
jobmanager.rpc.address: slave1 -
启动
启动HDFS
启动Zookeeper
将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 拷贝到flink 的lib 目录下,三台节点都拷贝。
启动flink
- 查看进程
7.测试
yum install -y nc
nc -lk 1234
四、Flink on yarn
4.1.1、flink与yarn的交互
在实际开发过程中使用Flink on yarn 模式比较多
4.1.2、配置
关闭yarn的内存检查,yarn-site.xml。并分发给其他节点。
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
分发
scp -r yarn-site.xml slave1:$PWD
scp -r yarn-site.xml slave2:$PWD
是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。在这里面我们需要关闭,因为对于flink 使用yarn模式下,很容易内存超标,这个时候yarn 会自动杀掉job。
重新启动所有服务
dhfs
yarn
zookeeper
flink
4.2、Session模式
- 特点:需要事先申请资源,启动JobManager 和TaskManger
- 优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率
- 缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源
- 应用场景:适合作业递交比较频繁的场景,小作业比较多的场景
4.2.1、案例
yarn-session.sh(开辟资源) + flink run(提交任务)
在yarn 上启动一个Flink 会话,执行以下命令:
yarn-session.sh -n 2 -tm 800 -s 1 -d flink run /usr/software/flink-1.13.0/examples/batch/WordCount.jar
参数说明:
# -n 表示申请2 个容器,这里指的就是多少个taskmanager
# -tm 表示每个TaskManager 的内存大小
# -s 表示每个TaskManager 的slots 数量
# -d 表示以后台程序方式运行
在yarn 的8088 界面可以看到Flink session cluster 正在运行。
任务完成
4.3、 Per-job模式
- 特点:每次递交作业都需要申请一次资源
- 优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源
- 缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间
- 应用场景:适合作业比较少的场景、大作业的场景
4.3.1、案例
直接提交job
flink run -m yarn-cluster -yjm 1024 -ytm 1024 /usr/software/flink-1.13.0/examples/batch/WordCount.jar
参数说明
# -m jobmanager 的地址
# -yjm 1024 指定jobmanager 的内存信息
# -ytm 1024 指定taskmanager 的内存信息
查看具体参数说明:
flink --help
五、Flink测试WordCount
-
安装netcat
yum install -y nc -
开启nc
nc -lk 1314 -
统计该端口输入的词频
flink run /usr/software/flink-1.13.0/examples/streaming/SocketWindowWordCount.jar --hostname master --port 1314
在1314端随意输入
ctrl+c 结束之后,另一端提示任务完成
- 查看统计结果
在Flink 的TaskManager 节点的log 目录下。查看以.out 结尾的文件。
以上是关于Flink的安装部署及WordCount测试的主要内容,如果未能解决你的问题,请参考以下文章
MacOS下安装Apache Flink及测试WordCount
第02讲:Flink 入门程序 WordCount 和 SQL 实现
使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响