Flink的安装部署及WordCount测试

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的安装部署及WordCount测试相关的知识,希望对你有一定的参考价值。

一、本地模式

在本地以多线程的方式模拟Flink中的多个角色。(开发环境不用)
下载地址:https://flink.apache.org/downloads.html
这里下载的是:flink-1.13.0-bin-scala_2.12.tgz
上传到常用的位置,然后解压。
启动:
切换到flink的bin目录下,执行./start-cluster.sh,然后查看进程。

二、Standalone 独立集群模式


(如果先做了第一步,记得先停止服务,stop-cluster.sh)

  1. 上传、解压tar包。
  2. 修改配置文件
    cd conf/flink-conf.yaml
    指定Flink集群JobManager RPC通信地址。
    (我这里的环境是三台机器,分别为master,slave1,slave2)
jobmanager.rpc.address: master

注意:flink-conf.yaml 中配置key/value 时候在“:”后面需要有一个空格,否则配置不会生效。
其他重要配置解释:

# jobmanager 和taskmanager 通信的端口号
jobmanager.rpc.port: 6123
# JobManager 的总进程内存大小
jobmanager.memory.process.size: 1600m
#当前taskmanager 整个进程占用的内存是多少
taskmanager.memory.process.size: 1728m
#每个taskmanager 可以提供的slots
taskmanager.numberOfTaskSlots: 1
#默认的并行度
parallelism.default: 1
#jobmanager 故障转移策略,1.9 之后出现的新属性,区域性恢复策略
jobmanager.execution.failover-strategy: region
#flink web 界面的端口号
rest.port: 8081

vi conf/workers
指定Flink 集群TaskManager。

slave1
slave2
  1. 发送到其他两个节点。
scp -r flink-1.13.0 slave1:$PWD
scp -r flink-1.13.0 slave2:$PWD
  1. 配置环境变量
    vi /etc/profile
export FLINK_HOME=/usr/software/flink-1.13.0
export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile

  1. 启动集群

  2. 查看Flink节点进程

    可以通过master:8081访问Flink的web界面

三、Standalone HA

从之前的架构中我们可以发现JobManager 有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦JobManager出现意外,其后果可想而知。
我们借助在Zookeeper 的帮助下,一个Standalone 的Flink 集群会同时有多个活着的JobManager,其中只有一个处于工作状态,其他处于Standby 状态。当工作中的JobManager 失去连接后(如宕机或Crash),Zookeeper 会从Standby中选一个新的JobManager 来接管Flink 集群。

  1. 集群规划
    master:JobManager+TaskManager
    slave1: JobManager+TaskManager
    slave2: JobManager

  2. 修改配置文件
    vi masters

master:8081
slave1:8081

vi worksers

master
slave1
slave2

vi flink-conf.yaml
添加以下内容

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://master:9000/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://master:9000/flink/ha/
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181
  1. 同步flink配置文件目录conf 到其他两个节点上
scp -r conf slave1:$PWD
scp -r conf slave2:$PWD
  1. 修改slave1上的jobmanager通信地址
    vi flink-conf.yaml
    jobmanager.rpc.address: slave1

  2. 启动
    启动HDFS
    启动Zookeeper

将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 拷贝到flink 的lib 目录下,三台节点都拷贝。

启动flink

  1. 查看进程

7.测试

yum install -y nc
nc -lk 1234

四、Flink on yarn

4.1.1、flink与yarn的交互

在实际开发过程中使用Flink on yarn 模式比较多

4.1.2、配置

关闭yarn的内存检查,yarn-site.xml。并分发给其他节点。

<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

分发

scp -r yarn-site.xml slave1:$PWD
scp -r yarn-site.xml slave2:$PWD

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。在这里面我们需要关闭,因为对于flink 使用yarn模式下,很容易内存超标,这个时候yarn 会自动杀掉job。

重新启动所有服务
dhfs
yarn
zookeeper
flink

4.2、Session模式

  • 特点:需要事先申请资源,启动JobManager 和TaskManger
  • 优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率
  • 缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源
  • 应用场景:适合作业递交比较频繁的场景,小作业比较多的场景

4.2.1、案例

yarn-session.sh(开辟资源) + flink run(提交任务)
在yarn 上启动一个Flink 会话,执行以下命令:

yarn-session.sh -n 2 -tm 800 -s 1 -d flink run /usr/software/flink-1.13.0/examples/batch/WordCount.jar

参数说明:

# -n 表示申请2 个容器,这里指的就是多少个taskmanager
# -tm 表示每个TaskManager 的内存大小
# -s 表示每个TaskManager 的slots 数量
# -d 表示以后台程序方式运行

在yarn 的8088 界面可以看到Flink session cluster 正在运行。

任务完成

4.3、 Per-job模式

  • 特点:每次递交作业都需要申请一次资源
  • 优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源
  • 缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间
  • 应用场景:适合作业比较少的场景、大作业的场景

4.3.1、案例

直接提交job

flink run -m yarn-cluster -yjm 1024 -ytm 1024 /usr/software/flink-1.13.0/examples/batch/WordCount.jar

参数说明

# -m jobmanager 的地址
# -yjm 1024 指定jobmanager 的内存信息
# -ytm 1024 指定taskmanager 的内存信息

查看具体参数说明:
flink --help

五、Flink测试WordCount

  1. 安装netcat
    yum install -y nc

  2. 开启nc
    nc -lk 1314

  3. 统计该端口输入的词频

flink run /usr/software/flink-1.13.0/examples/streaming/SocketWindowWordCount.jar --hostname master --port 1314

在1314端随意输入

ctrl+c 结束之后,另一端提示任务完成

  1. 查看统计结果

在Flink 的TaskManager 节点的log 目录下。查看以.out 结尾的文件。

以上是关于Flink的安装部署及WordCount测试的主要内容,如果未能解决你的问题,请参考以下文章

MacOS下安装Apache Flink及测试WordCount

第02讲:Flink 入门程序 WordCount 和 SQL 实现

05-flink-1.10.1-flink on yarn 流处理WordCount

Flink学习 批流版本的wordcount JAVA版本

Flink学习 批流版本的wordcount JAVA版本

使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响