Flink1.15源码解析--启动脚本----start-cluster.sh
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动脚本----start-cluster.sh相关的知识,希望对你有一定的参考价值。
文章目录
- 一、 start-cluster.sh
- 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
一、 start-cluster.sh
[root@chb1 bin]# cat start-cluster.sh
#!/usr/bin/env bash
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# 加载配置
. "$bin"/config.sh
# Start the JobManager instance(s)
shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
# HA Mode config.sh中的方法
readMasters
echo "Starting HA cluster with $#MASTERS[@] masters."
# 启动jobmanager
for ((i=0;i<$#MASTERS[@];++i)); do
master=$MASTERS[i]
webuiport=$WEBUIPORTS[i]
# 启动本地
if [ $MASTERS_ALL_LOCALHOST = true ] ; then
"$FLINK_BIN_DIR"/jobmanager.sh start "$master" "$webuiport"
# 否则通过ssh启动远程jobmanager
else
ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \\"$FLINK_BIN_DIR/jobmanager.sh\\" start $master $webuiport &"
fi
done
else
echo "Starting cluster."
# Start single JobManager on this machine
"$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch
# Start TaskManager instance(s)
# 启动 taskmanager
TMWorkers start
整个流程如下:
- 1、加载 config.sh,环境变量,以及后面需要调用的方法
- 2、启动jobmanager
- 如果是 HA 模式, 调用 config.sh 中的 readMasters 获取 master ,一一启动 jobmanager
- 本机是master,直接调用
"$FLINK_BIN_DIR"/jobmanager.sh start "$master" "$webuiport"
- 否则通过 ssh 调用远程 jobmanager
- 本机是master,直接调用
- 如果是 HA 模式, 调用 config.sh 中的 readMasters 获取 master ,一一启动 jobmanager
- 3、启动taskmanager
- TMWorkers start
1.1、config.sh
这个文件很长,定义一些变量 、方法
1.2、jobmanager.sh
[root@chb1 bin]# cat jobmanager.sh
#!/usr/bin/env bash
# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
fi
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# 注意该变量的值,如果使用start-cluster脚本启动flink,启动的就是一个standalonesession模式的集群
ENTRYPOINT=standalonesession
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# Add JobManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="$FLINK_ENV_JAVA_OPTS $FLINK_ENV_JAVA_OPTS_JM"
parseJmArgsAndExportLogs "$ARGS[@]"
args=("--configDir" "$FLINK_CONF_DIR" "--executionMode" "cluster")
if [ ! -z $HOST ]; then
args+=("--host")
args+=("$HOST")
fi
if [ ! -z $WEBUIPORT ]; then
args+=("--webui-port")
args+=("$WEBUIPORT")
fi
if [ ! -z "$DYNAMIC_PARAMETERS" ]; then
args+=($DYNAMIC_PARAMETERS[@])
fi
fi
# 判断是否为前台模式启动
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "$FLINK_BIN_DIR"/flink-console.sh $ENTRYPOINT "$args[@]"
else
# 通过 flink-daemon.sh启动, 将 ENTRYPOINT=standalonesession作为参数
"$FLINK_BIN_DIR"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "$args[@]"
fi
执行案例命令如下
# 为了显示好看,将一行转为多行显示
/uardata1/soft/flink-1.15.2/bin/flink-daemon.sh start standalonesession
--configDir /uardata1/soft/flink-1.15.2/conf
--executionMode cluster
-D jobmanager.memory.off-heap.size=134217728b
-D jobmanager.memory.jvm-overhead.min=201326592b
-D jobmanager.memory.jvm-metaspace.size=268435456b
-D jobmanager.memory.heap.size=1073741824b
-D jobmanager.memory.jvm-overhead.max=201326592b
这个脚本里最重要的内容,就是配置了 ENTRYPOINT=standalonesession 这个变量,确定了集群启动模式,从这里我们也可以看出,如果使用 start-cluster.sh 脚本启动 flink,那么我们启动的就是一个 standalonesession 模式的集群。
我们继续往下看,在配置完 ENTRYPOINT 的值后,脚本去调用了 flink-daemon.sh 脚本,并将 ENTRYPOINT 的值传入。flink-daemon.sh 脚本的内容我们稍等下去看,先来看看 taskmanager.sh 脚本做了什么。
1.3、taskmanager.sh
start-cluster.sh 中 TMWorkers start
启动 taskmanager
# Start TaskManager instance(s)
# 启动 taskmanager
TMWorkers start
实际调用是config.sh中的TMWorkers()
# starts or stops TMs on all workers
# TMWorkers start|stop
TMWorkers()
CMD=$1
# 获取 workers
readWorkers
# worker 是否都是本地节点
if [ $WORKERS_ALL_LOCALHOST = true ] ; then
# all-local setup
for worker in $WORKERS[@]; do
"$FLINK_BIN_DIR"/taskmanager.sh "$CMD"
done
else
# non-local setup
# start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
command -v pdsh >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
for worker in $WORKERS[@]; do
ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \\"$FLINK_BIN_DIR/taskmanager.sh\\" \\"$CMD\\" &"
done
else
PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "$WORKERS[*]") \\
"nohup /bin/bash -l \\"$FLINK_BIN_DIR/taskmanager.sh\\" \\"$CMD\\""
fi
fi
taskmanager.sh 脚本里有很多内容,我们来看最重要的两点:
1、为ENTRYPOINT赋值taskexecutor
2、调用flink-daemon.sh脚本,并将ENTRYPOINT传入
#!/usr/bin/env bash
# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
STARTSTOP=$1
ARGS=("$@:2")
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
fi
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# 注意该变量的值
ENTRYPOINT=taskexecutor
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# if no other JVM options are set, set the GC to G1
if [ -z "$FLINK_ENV_JAVA_OPTS" ] && [ -z "$FLINK_ENV_JAVA_OPTS_TM" ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi
# Add TaskManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="$FLINK_ENV_JAVA_OPTS $FLINK_ENV_JAVA_OPTS_TM"
# Startup parameters
parseTmArgsAndExportLogs "$ARGS[@]"
if [ ! -z "$DYNAMIC_PARAMETERS" ]; then
ARGS=($DYNAMIC_PARAMETERS[@] "$ARGS[@]")
fi
ARGS=("--configDir" "$FLINK_CONF_DIR" "$ARGS[@]")
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "$FLINK_BIN_DIR"/flink-console.sh $ENTRYPOINT "$ARGS[@]"
else
if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
"$FLINK_BIN_DIR"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "$ARGS[@]"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
# preferred node: current
# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# cpubind: 0 1
# nodebind: 0 1
# membind: 0 1
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "$NODE_LIST[@]:1"; do
# Start a TaskManager for each NUMA node
# 依然是调用 flink-daemon.sh 脚本,并将ENTRYPOINT=taskexecutor作为参数传入
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "$FLINK_BIN_DIR"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "$ARGS[@]"
done
fi
fi
这里我们看到,再一次的调用了flink-daemon.sh脚本,现在我们去看看flink-daemon.sh做了什么
1.4、flink-daemon.sh
这个脚本很长,但是里面主要分为两部分:
1.4.1、根据传入的ENTRYPOINT参数确定入口类
在前面的jobmanager.sh我们将standalonesession作为参数传入了该脚本,在taskmanager.sh脚本中我们将taskexecutor作为参数传入了该脚本,可以看到这个操作就是为了确定主节点和从节点的入口类分别为什么
case $DAEMON in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(historyserver)
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
(*)
echo "Unknown daemon '$DAEMON'. $USAGE."
exit 1
;;
esac
1.4.2、将入口类作为参数启动jar
case $STARTSTOP in
(start)
# Print a warning if daemons are already running on host
if [ -f "$pid" ]; then
active=()
while IFS='' read -r p || [[ -n "$p" ]]; do
kill -0 $p >/dev/null 2>&1
if [ $? -eq 0 ]; then
active+=($p)
fi
done < "$pid"
count="$#active[@]"
if [ $count -gt 0 ]; then
echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
fi
fi
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo $FLINK_ENV_JAVA_OPTS)
echo "Starting $DAEMON daemon on host $HOSTNAME."
# 启动
"$JAVA_RUN" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "$log_setting[@]" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" $CLASS_TO_RUN "$ARGS[@]" > "$out" 200<&- 2>&1 < /dev/null &
mypid=$!
# Add to pid file if successful start
if [[ $mypid =~ $IS_NUMBER ]] && kill -0 $mypid > /dev/null 2>&1 ; then
echo $mypid >> "$pid"
else
echo "Error starting $DAEMON daemon."
exit 1
fi
;;
到此为止,我们找到了真正启动主节点和从节点的地方,也找到了
- 主节点的入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
- 从节点的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner。
接下来我们就可以将这两个类作为Flink源码的入口来一探究竟了。
返回Flink1.15源码解析-总目录
以上是关于Flink1.15源码解析--启动脚本----start-cluster.sh的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动