从0到1Flink的成长之路

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路相关的知识,希望对你有一定的参考价值。

继续坚持!!!

 

Flink安装部署

  • 前言
  • 运行架构
  • 二、本地模式
  • Local Cluster 本地集群
    • 1.原理
    • 2.操作
    • 3.测试
  • Standalone 独立集群
  • Standalone HA 高可用集群
  • Flink On Yarn
  • 总结

 


前言

Flink支持多种安装模式
- Local:本地单机模式,学习测试时使用
- Standalone:独立集群模式,Flink 自带集群,开发测试环境使用
- Standalone HA:独立集群高可用模式,Flink自带集群,开发测试及生产环境使用
- On Yarn:计算资源统一由Hadoop YARN管理,生产环境使用

 

一、运行架构

Flink 应用程序运行时,应用提交、资源申请与任务分配、任务执行,由三部分组成:

FlinkClient: Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回(MapReduce Client:yarn jar hadoop-mapreduce.jar WordCount input ouput)

JobManager:主要负责调度 Job 并协调 Task 做 checkpoint,从 Client 处接收到 Job 和JAR 包等资源后,会生成优化后的执行计划,并以 Task 为单元调度到各个 TaskManager 去执行。(ResourceManager和ApplicationMaster或JobTracker)
TaskManager:在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立连接,接收数据(NodeManager或TaskTracker)

附录:MapReduce 1.0 架构组成

 

二、本地模式

本地模式LocalMode表示JobManager和TaskManager运行在一个JVM进程中。

1.启动shell交互式窗口

/export/server/flink/bin/start-scala-shell.sh local

2流计算,执行如下命令

senv.socketTextStream("node1.itcast.cn", 9999).flatMap(_.split("\\\\s+")).map((_, 1)).keyBy(0).sum(1).print()

3.准备文件/root/words.txt

vim /root/words.txt

//写入以下文本,注意空格符保持一致
flink flink spark spark flink hadoop
flink spark spark
flink hadoop flink
flink spark flink

4.批处理,执行如下命令

benv.readTextFile("/root/words.txt").flatMap(_.split("\\\\s+")).map((_,1)).groupBy(0).sum(1).print()

5.退出shell

:quit

三、Local Cluster 本地集群

Flink 应用运行Local Cluster本地集群环境,与Hadoop 框架伪分布式环境类似,所有进程Process运行在一台机器上,针对Flink框架来说,进程分别为JobManager(主节点,管理者)和TaskManager(从节点,干活着)。

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html#requirements

     1.原理

1. Flink程序由JobClient进行提交。

2. JobClient将作业提交给JobManager。

3. JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager。

4. TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成

5. 作业执行完成后,结果将发送回客户端(JobClient)。

2.操作

       1.下载安装包:https://archive.apache.org/dist/flink/flink-1.10.0/

       2.上传flink-1.10.0-bin-scala_2.11.tgz到node1的指定目录

       3.解压

tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
chown -R root:root /export/server/flink-1.10.0/

4.创建软连接

ln -s flink-1.10.0 flink

     3.测试

    1).本地集群启动与停止

         1.启动Flink本地集

/export/server/flink/bin/start-cluster.sh

         2.使用jps可以查看到下面两个进程

- TaskManagerRunner
- StandaloneSessionClusterEntrypoint

         3.访问Flink的Web UI:http://node1.itcast.cn:8081/#/overview

slot在Flink里面可以认为是资源组,Flink是通过将任务(Task)分成子任务(SubTask)并且将这些子任务分配到slot来并行执行程序。

         4.停止Flink

/export/server/flink/bin/stop-cluster.sh

2)、流计算:WordCount

         运行流式计算程序,从TCP Socket 读取数据,进行词频统计(类似StructuredStreaming)。

# 开启终端
nc -lk 9999
# 上传jar包至/export/server/flink目录
cd /export/server/flink
rz
# 开启终端,运行流式应用
bin/flink run --class cn.itcast.flink.StreamWordCount \\
/export/server/flink/StreamWordCount.jar \\
--host node1.itcast.cn --port 9999

         监控页面查看日志信息数据

3)、批处理:WordCount

         执行官方示例,读取文本文件数据,进行词频统计WordCount,将结果打印控制台或文件。

打印控制台:

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt

输出文件:

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar 
--input /root/words.txt --output /root/out.txt

Standalone 独立集群

Standalone集群类似Hadoop YARN集群,管理集群资源和分配资源给Flink应用运行任务Task。

     1原理

1. client客户端提交任务给JobManager;
2. JobManager负责申请任务运行所需要的资源并管理任务和资源;
3. JobManager分发任务给TaskManager执行;
4. TaskManager定期向JobManager汇报状态;

     2 操作

1.集群规划:
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Slave): TaskManager
- 服务器: node3(Slave): TaskManager

2.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: 虚拟机IP地址

3.修改masters

vim /export/server/flink/conf/masters
虚拟机IP地址:8081

4.修改slaves

vim /export/server/flink/conf/slaves
第一台虚拟机IP
第二台虚拟机IP
第三台虚拟机IP

5.添加HADOOP_CONF_DIR环境变量(集群所有机器)

vim /etc/profile
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
source /etc/profile

6.将Flink依赖Hadoop 框架JAR包上传至FLINK_HOME/lib目录

7.分发

scp -r /export/server/flink root@第二台虚拟机IP:/export/server
scp -r /export/server/flink root@第三台虚拟机IP:/export/server

     3 测试

1.启动HDFS集群,在第一台虚拟机上上执行如下命令

hadoop-daemon.sh start namenode
hadoop-daemons.sh start datanode

2.启动集群,在第一台虚拟机上执行如下命令

/export/server/flink/bin/start-cluster.sh

或者单独启动

/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

3.访问Flink UI界面或使用jps查看
http://第一台虚拟机IP:8081/#/overview
TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManager的slots、内存、CPU Core是多少

4.执行官方测试案例

hdfs dfs -mkdir -p /wordcount/input/
hdfs dfs -put /root/words.txt /wordcount/input/

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar \\
--input hdfs://node1.itcast.cn:8020/wordcount/input/words.txt

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar \\
--input hdfs://node1.itcast.cn:8020/wordcount/input/words.txt \\
--output hdfs://node1.itcast.cn:8020/wordcount/output/result.txt \\
--parallelism 2

WEB UI截图如下:

五,Standalone HA 高可用集群

        从上述架构图中,可发现JobManager存在单点故障(SPOF),一旦JobManager出现意外,整个集群无法工作。为了确保集群的高可用,需要搭建Flink的HA。(如果是部署在YARN上,部署YARN的HA),演示如何搭建Standalone HA模式。

        1 原理

在 Zookeeper 的协助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中只有一个处于Active工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。

        2 操作

1.集群规划
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Master + Slave): JobManager + TaskManager
- 服务器: node3(Slave): TaskManager

2.启动ZooKeeper

zookeeper-daemons.sh start

3.启动HDFS

hadoop-daemon.sh start namenode
hadoop-daemons.sh start datanode

4.停止集群

/export/server/flink/bin/stop-cluster.sh

5.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

增加如下内容:

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1.itcast.cn:8020/flink-checkpoints
state.savepoints.dir: hdfs://node1.itcast.cn:8020/flink-savepoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1.itcast.cn:8020/flink-ha/
high-availability.zookeeper.quorum: node1.itcast.cn:2181.itcast.cn,node2:2181,node3.itcast.cn:2181

配置解释如下:

#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node1.itcast.cn:8020/flink-checkpoints
#使用zookeeper搭建高可用
high-availability: zookeeper
# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node1.itcast.cn:8020/flink-ha/
# 配置ZK集群地址
high-availability.zookeeper.quorum: node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181

6.修改masters

vim /export/server/flink/conf/masters
第一台虚拟机IP:8081
第二台虚拟机IP:8081

7.同步配置文件

scp -r /export/server/flink/conf/flink-conf.yaml node2.itcast.cn:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3.itcast.cn:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2.itcast.cn:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3.itcast.cn:/export/server/flink/conf/

8.修改node2上的flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: 第二台虚拟机IP

9.重新启动Flink集群,node1上执行

/export/server/flink/bin/start-cluster.sh

10.使用jps命令查看

发现没有Flink相关进程被启动

11. 下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar
https://flink.apache.org/downloads.html
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2- uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

        3 测试

1. 访问WebUI:

http://第一台虚拟机IP:8081/#/overview
http://第二台虚拟机IP:8081/#/overview

2.执行wc

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar \\
--input hdfs://node1.itcast.cn:8020/wordcount/input/words.txt

3.kill掉其中一个master

4.重新执行wc,还是可以正常执行

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar \\
--input hdfs://node1.itcast.cn:8020/wordcount/input/words.txt

Flink On Yarn

在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的Workload,因此 Flink 也支持在 Yarn 集群运行。Flink on YARN前提:HDFS、YARN均启动。

    1.原理

         1)、为什么使用Flink On Yarn?

             在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
              -1.Yarn的资源可以按需使用,提高集群的资源利用率
              -2.Yarn的任务有优先级,根据优先级运行作业
             -3.基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

当Flink on YARN 运行时,有如下特点:

○ JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控;
○ 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 
到其他机器;
○ 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn 
ResourceManager 申请资源,重新启动 TaskManager;

         2)、Flink如何与Yarn进行交互?

1.Client上传jar包和配置文件到HDFS集群上
2.Client向Yarn ResourceManager提交任务并申请资源
3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
          JobManager和ApplicationMaster运行在同一个container上;
          一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器);它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager);这个配置文件也被上传到HDFS上;

          此外,AppMaster容器也提供了Flink的web服务接口;
          YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink
4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

2 两种方式

         1)、Session 会话模式

特点:需要事先申请资源,启动JobManager和TaskManager
优点:不需要每次提交作业申请资源,而是使用已经申请好的资源,从而提高执行效率
缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源
应用场景:适合作业提交比较频繁的场景,小作业比较多的场景

         2)、Job 分离模式

特点:每次提交作业都需要申请一次资源

优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源

缺点:每次提交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间

应用场景:适合作业比较少的场景、大作业的场景

3.操作

1.关闭yarn的内存检查

vim /export/server/hadoop/etc/h

添加:

<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
 <value>false</value>
</property>
<property>
 <name>yarn.nodemanager.vmem-check-enabled</name>
 <value>false</value>
</property>

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

此处需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job。

2.同步

cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml root@第二台虚拟机IP:$PWD
scp -r yarn-site.xml root@第三台虚拟机IP:$PWD

3.重启YARN

/export/server/hadoop/sbin/stop-yarn.sh
/export/server/hadoop/sbin/start-yarn.sh

4 测试

         1)、Session 会话模式

yarn-session.sh(开辟资源) + flink run(提交任务)

1.在yarn上启动一个Flink会话,node1上执行以下命令

/export/server/flink/bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 2

说明:
# -tm 表示每个TaskManager的内存大小
# -s 表示每个TaskManager的slots数量
# -d 表示以后台程序方式运行
附录:yarn-session 命令使用帮助

[root@node1 ~]# /export/server/flink/bin/yarn-session.sh --help
Usage:
 Optional
 -at,--applicationType <arg> Set a custom application type for the application on YARN
 -D <property=value> use value for given property
 -d,--detached If present, runs the job in detached mode
 -h,--help Help for the Yarn session CLI.
 -id,--applicationId <arg> Attach to running YARN session
 -j,--jar <arg> Path to Flink jar file
 -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
 -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to 
connect to a different JobManager than the one specified in the configuration.
 -nl,--nodeLabel <arg> Specify YARN node label for the YARN application
 -nm,--name <arg> Set a custom name for the application on YARN
 -q,--query Display available YARN resources (memory, cores)
 -qu,--queue <arg> Specify YARN queue.
 -s,--slots <arg> Number of slots per TaskManager
 -t,--ship <arg> Ship files in the specified directory (t for transfer)
 -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
 -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific 
option instead)
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode

2.查看UI界面

http://第一台虚拟机IP:8088/cluster

3.使用flink run提交任务:

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar \\
--input hdfs://node1.itcast.cn:8020/wordcount/input

运行完之后可以继续运行其他的小任务

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar


4.通过上方的ApplicationMaster可以进入Flink的管理界面

5.关闭yarn-session:

yarn application -kill application_1599402747874_0001

删除运行临时文件

rm -rf /tmp/.yarn-properties-root

         2)、Job 分离模式

1.直接提交job

/export/server/flink/bin/flink run \\
-m yarn-cluster -yjm 1024 -ytm 1024
/export/server/flink/examples/batch/WordCount.jar

# -m jobmanager的地址
# -yjm 1024 指定jobmanager的内存信息
# -ytm 1024 指定taskmanager的内存信息
2.查看UI界面
http://第一台虚拟机IP:8088/cluster

 

3.注意:
在之前版本中如果使用的是flink on yarn方式,想切换回standalone模式的话,如果报错需要
删除:【/tmp/.yarn-properties-root】

rm -rf /tmp/.yarn-properties-root​​​​​​​

因为默认查找当前yarn集群中已有的yarn-session信息中的jobmanager

5 参数总结

//参数将就着看,执行命令flink --help 就可以查看,不需要记,不会就查
[root@node1 bin]# flink --help
./flink <ACTION> [OPTIONS] [ARGUMENTS]
The following actions are available:
Action "run" compiles and runs a program.
 Syntax: run [OPTIONS] <jar-file> <arguments>
 "run" action options:
 -c,--class <classname> Class with the program entry point("main()" method). Only needed if the JAR file does not specify the class in its manifest.

 -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes 
 (e.g. by means of a NFS share). You can use this option multiple times for specifying
more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}.

 -d,--detached If present, runs the job in detached
 mode
 -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow his if you removed an operator from your program that was part of the
 program when the savepoint was triggered.

 -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration.

 -py,--python <pythonFile> Python script with the program entry point. The dependent resources can be configured with the `--pyFiles`
option.
 -pyarch,--pyArchives <arg> Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files
used in Python UDF (e.g.: --pyArchives  file:///tmp/py37.zip,file:///tmp/data.
 zip#data --pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF,e.g.: f = open('data/data.txt', 'r').

 -pyexec,--pyExecutable <arg> Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.15.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets
the above requirements.

 -pyfs,--pyFiles <pythonFiles> Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The
standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could beused as the separator to specify multiple files (e.g.: --pyFiles file:///tmp/myresource.zip,hdfs:///$na menode_address/myresource2.zip).

 -pym,--pyModule <pythonModule> Python module with the program entry point. This option must be used in conjunction with `--pyFiles`.

 -pyreq,--pyRequirements <arg> Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g.: --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).

 -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).

 -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.

 Options for yarn-cluster mode:
 -d,--detached If present, runs the job in detached mode

 -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.

 -yat,--yarnapplicationType <arg> Set a custom application type for the application on YARN

 -yD <property=value> use value for given property

 -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN
specific option instead)

 -yh,--yarnhelp Help for the Yarn session CLI.

 -yid,--yarnapplicationId <arg> Attach to running YARN session

 -yj,--yarnjar <arg> Path to Flink jar file

 -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)

 -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application

 -ynm,--yarnname <arg> Set a custom name for the application on YARN

 -yq,--yarnquery Display available YARN resources (memory, cores)

 -yqu,--yarnqueue <arg> Specify YARN queue.

 -ys,--yarnslots <arg> Number of slots per TaskManager

 -yt,--yarnship <arg> Ship files in the specified directory (t for transfer)

 -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)

 -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode

 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode

 Options for executor mode:
 -D <property=value> Generic configuration options for execution/deployment and for the configured executor. The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
 e/ops/config.html
 -e,--executor <arg> The name of the executor to be used for executing the
 given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
 Options for default mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to which
 to connect. Use this flag to connect to a
different JobManager than the one specified
 in the configuration.
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "info" shows the optimized execution plan of the program (JSON).
 Syntax: info [OPTIONS] <jar-file> <arguments>
 "info" action options:
 -c,--class <classname> Class with the program entry point
 ("main()" method). Only needed if the JAR
 file does not specify the class in its
 manifest.
 -p,--parallelism <parallelism> The parallelism with which to run the
 program. Optional flag to override the
 default value specified in the
 configuration.
Action "list" lists running and scheduled programs.
 Syntax: list [OPTIONS]
 "list" action options:
 -a,--all Show all programs and their JobIDs
 -r,--running Show only running programs and their JobIDs
 -s,--scheduled Show only scheduled programs and their JobIDs
 Options for yarn-cluster mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to
 which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
 -yid,--yarnapplicationId <arg> Attach to running YARN session
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
 sub-paths for high availability mode
 Options for executor mode:
 -D <property=value> Generic configuration options for
 execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
 e/ops/config.html
 -e,--executor <arg> The name of the executor to be used for executing the
 given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
 Options for default mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to which
 to connect. Use this flag to connect to a
different JobManager than the one specified
 in the configuration.
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
 for high availability mode
Action "stop" stops a running program with a savepoint (streaming jobs only).
 Syntax: stop [OPTIONS] <Job ID>
 "stop" action options:
 -d,--drain Send MAX_WATERMARK before taking the
 savepoint and stopping the pipelne.
-p,--savepointPath <savepointPath> Path to the savepoint (for example
 hdfs:///flink/savepoint-1537). If no
directory is specified, the configured
default will be used
("state.savepoints.dir").
 Options for yarn-cluster mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to
 which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
 -yid,--yarnapplicationId <arg> Attach to running YARN session
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
 sub-paths for high availability mode
 Options for executor mode:
 -D <property=value> Generic configuration options for
 execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
 e/ops/config.html
 -e,--executor <arg> The name of the executor to be used for executing the
 given job, which is equivalent to the
"execution.target" config option. The currently
 available executors are: "remote", "local",
 "kubernetes-session", "yarn-per-job", "yarn-session".
 Options for default mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to which
 to connect. Use this flag to connect to a
 different JobManager than the one specified
in the configuration.
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
 for high availability mode
Action "cancel" cancels a running program.
 Syntax: cancel [OPTIONS] <Job ID>
 "cancel" action options:
 -s,--withSavepoint <targetDirectory> **DEPRECATION WARNING**: Cancelling
 a job with savepoint is deprecated.
Use "stop" instead.
 Trigger savepoint and cancel job.
 The target directory is optional. If
no directory is specified, the
configured default directory
(state.savepoints.dir) is used.
 Options for yarn-cluster mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to
 which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
 -yid,--yarnapplicationId <arg> Attach to running YARN session
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
 sub-paths for high availability mode
 Options for executor mode:
-D <property=value> Generic configuration options for
 execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
 e/ops/config.html
 -e,--executor <arg> The name of the executor to be used for executing the
 given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
 Options for default mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to which
 to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
 for high availability mode
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
 Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
 "savepoint" action options:
 -d,--dispose <arg> Path of savepoint to dispose.
 -j,--jarfile <jarfile> Flink program JAR file.
 Options for yarn-cluster mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to
 which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
 -yid,--yarnapplicationId <arg> Attach to running YARN session
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
 sub-paths for high availability mode
 Options for executor mode:
 -D <property=value> Generic configuration options for
 execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
 e/ops/config.html
 -e,--executor <arg> The name of the executor to be used for executing the
 given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
 Options for default mode:
 -m,--jobmanager <arg> Address of the JobManager (master) to which
 to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
 -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
 for high availability mode

以上是关于从0到1Flink的成长之路的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(十六)

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路(十三)