FlinkFlink 1.12.2 启动脚本

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 启动脚本相关的知识,希望对你有一定的参考价值。

1.概述

转载:https://zhangboyi.blog.csdn.net/article/details/114631783 如果侵权,可删。

2. 调用关系

服务入口类备注
taskexecutororg.apache.flink.runtime.taskexecutor.TaskManagerRunnerxxx
standalonesessionorg.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypointxxx
standalonejoborg.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPointxxx
historyserverorg.apache.flink.runtime.webmonitor.history.HistoryServerxxx
zookeeperorg.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeerxxx
flink run -t yarn-per-joborg.apache.flink.client.cli.CliFrontendyarn-per-job 模式提交任务
yarn-session.shorg.apache.flink.yarn.cli.FlinkYarnSessionCliyarn session模式入口

二 .start-cluster.sh

start-cluster.sh 是Flink的启动脚本,所以看一下这个脚本都干了啥
三步:

  • 在执行start-cluster.sh的时候,首先执行了config.sh. 加载需要用的函数/配置.
  • 启动JobManager实例.
  • 3.启动TaskManager实例.

2.1. 加载全局配置函数config.sh

这里面主要是一些环境变量 和抽取的一些方法.

环境变量信息

### Exported environment variables ###
export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_PLUGINS_DIR
# export /lib dir to access it during deployment of the Yarn staging files
export FLINK_LIB_DIR
# export /opt dir to access it for the SQL client
export FLINK_OPT_DIR

readMasters 读取conf/masters配置文件,获取master的服务安装位置

readMasters() {
    MASTERS_FILE="${FLINK_CONF_DIR}/masters"

    if [[ ! -f "${MASTERS_FILE}" ]]; then
        echo "No masters file. Please specify masters in 'conf/masters'."
        exit 1
    fi

    MASTERS=()
    WEBUIPORTS=()

    MASTERS_ALL_LOCALHOST=true
    GOON=true
    while $GOON; do
        read line || GOON=false
        HOSTWEBUIPORT=$( extractHostName $line)

        if [ -n "$HOSTWEBUIPORT" ]; then
            HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
            WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)
            MASTERS+=(${HOST})

            if [ -z "$WEBUIPORT" ]; then
                WEBUIPORTS+=(0)
            else
                WEBUIPORTS+=(${WEBUIPORT})
            fi

            if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
                MASTERS_ALL_LOCALHOST=false
            fi
        fi
    done < "$MASTERS_FILE"
}

readWorkers 读取conf/workers 配置文件获取worker节点的安装位置

readWorkers() {
    WORKERS_FILE="${FLINK_CONF_DIR}/workers"

    if [[ ! -f "$WORKERS_FILE" ]]; then
        echo "No workers file. Please specify workers in 'conf/workers'."
        exit 1
    fi

    WORKERS=()

    WORKERS_ALL_LOCALHOST=true
    GOON=true
    while $GOON; do
        read line || GOON=false
        HOST=$( extractHostName $line)
        if [ -n "$HOST" ] ; then
            WORKERS+=(${HOST})
            if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
                WORKERS_ALL_LOCALHOST=false
            fi
        fi
    done < "$WORKERS_FILE"
}

2.2. 启动 jobManger

根据master配置的信息,循环启动master .
如果是本地的话直接调用jobmanager.sh脚本, 否则的话通过ssh 命令远程执行jobmanager.sh启动…

for ((i=0;i<${#MASTERS[@]};++i)); do
        master=${MASTERS[i]}
        webuiport=${WEBUIPORTS[i]}

        if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
            "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
        else
            ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \\"${FLINK_BIN_DIR}/jobmanager.sh\\" start ${master} ${webuiport} &"
        fi
    done

masters 配置文件内容

localhost:8081

2.3. 启动TaskManager实例

在 start-cluster.sh脚本启动TaskManager实例的代码就一行.

TMWorkers start

需要去config.sh里面找到对应的代码.
如果是本地的话直接调用taskmanager.sh脚本, 否则的话通过ssh 命令远程执行taskmanager.sh启动…

# 确定/停止所有的worker节点
# starts or stops TMs on all workers
# TMWorkers start|stop
TMWorkers() {
    CMD=$1

    readWorkers

    if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
        # all-local setup
        for worker in ${WORKERS[@]}; do
            "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
        done
    else
        # non-local setup
        # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
        command -v pdsh >/dev/null 2>&1
        if [[ $? -ne 0 ]]; then
            for worker in ${WORKERS[@]}; do
                ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \\"${FLINK_BIN_DIR}/taskmanager.sh\\" \\"${CMD}\\" &"
            done
        else
            PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \\
                "nohup /bin/bash -l \\"${FLINK_BIN_DIR}/taskmanager.sh\\" \\"${CMD}\\""
        fi
    fi
}

workers 配置文件内容

localhost

2.4. 完整代码

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# 1. 加载公共配置属性&方法
. "$bin"/config.sh

# 2. 启动JobManager实例
# Start the JobManager instance(s)

# 通知 shell 忽略字符串匹配中的大小写
shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
    # HA Mode
    # 读取 conf/master配置文件 获取master的信息
    readMasters

    echo "Starting HA cluster with ${#MASTERS[@]} masters."

    for ((i=0;i<${#MASTERS[@]};++i)); do
        master=${MASTERS[i]}
        webuiport=${WEBUIPORTS[i]}

        if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
            "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
        else
            ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \\"${FLINK_BIN_DIR}/jobmanager.sh\\" start ${master} ${webuiport} &"
        fi
    done

else
    echo "Starting cluster."

    # Start single JobManager on this machine
    "$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch

# 3.启动TaskManager实例
# Start TaskManager instance(s)
TMWorkers start

三 .jobmanager.sh

jobmanager.sh脚本是启动/停止JobManager服务的脚本
参数格式 :

Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

在执行start-cluster.sh的时候,首先执行了config.sh. 加载需要用的函数/配置.
加载各种参数,最终调用 flink-daemon.sh 脚本…
最终输出的脚本样例 :

${FLINK_HOME}/bin/flink-daemon.sh 
    start 
    standalonesession 
    --configDir /opt/tools/flink-1.12.2/conf 
    --executionMode cluster 
    -D jobmanager.memory.off-heap.size=134217728b 
    -D jobmanager.memory.jvm-overhead.min=201326592b 
    -D jobmanager.memory.jvm-metaspace.size=268435456b 
    -D jobmanager.memory.heap.size=1073741824b 
    -D jobmanager.memory.jvm-overhead.max=201326592b

完整代码 :

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"

STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

ENTRYPOINT=standalonesession

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
    # Add JobManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
    parseJmArgsAndExportLogs "${ARGS[@]}"

    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
    if [ ! -z $HOST ]; then
        args+=("--host")
        args+=("${HOST}")
    fi

    if [ ! -z $WEBUIPORT ]; then
        args+=("--webui-port")
        args+=("${WEBUIPORT}")
    fi

    if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
        args+=(${DYNAMIC_PARAMETERS[@]})
    fi
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
    echo "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
    ## "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi

四 .taskmanager.sh

taskmanager.sh脚本是启动/停止TaskManager服务的脚本
参数格式 :

Usage: taskmanager.sh (start|start-foreground|stop|stop-all)

在执行start-cluster.sh的时候,首先执行了config.sh. 加载需要用的函数/配置.
加载各种参数,最终调用 flink-daemon.sh 脚本…
最终输出的脚本样例 :

${FLINK_HOME}/bin/flink-daemon.sh 
    start taskexecutor 
    --configDir /opt/tools/flink-1.12.2/conf 
    -D taskmanager.memory.framework.off-heap.size=134217728b 
    -D taskmanager.memory.network.max=134217730b 
    -D taskmanager.memory.network.min=134217730b 
    -D taskmanager.memory.framework.heap.size=134217728b 
    -D taskmanager.memory.managed.size=536870920b 
    -D taskmanager.cpu.cores=1.0 
    -D taskmanager.memory.task.heap.size=402653174b 
    -D taskmanager.memory.task.off-heap.size=0b 
    -D taskmanager.memory.jvm-metaspace.size=268435456b 
    -D taskmanager.memory.jvm-overhead.max=201326592b 
    -D taskmanager.memory.jvm-overhead.min=201326592b

完整代码

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"

STARTSTOP=$1

ARGS=("${@:2}")

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

ENTRYPOINT=taskexecutor

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

    # if no other JVM options are set, set the GC to G1
    if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
        export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
    fi

    # Add TaskManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

    # Startup parameters

    java_utils_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}")

    logging_output=$(extractLoggingOutputs "${java_utils_output}")
    params_output=$(extractExecutionResults "${java_utils_output}" 2)

    if [[ $? -ne 0 ]]; then
        echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
        echo "[ERROR] Raw output from BashJavaUtils:"
        echo "$java_utils_output"
        exit 1
    fi

    jvm_params=$(echo "${params_output}" | head -n 1)
    export JVM_ARGS="${JVM_ARGS} ${jvm_params}"

    IFS=$" " dynamic_configs=$(echo "${params_output}" | tail -n 1)
    ARGS=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]} "${ARGS[@]}")

    export FLINK_INHERITED_LOGS="
$FLINK_INHERITED_LOGS

TM_RESOURCE_PARAMS extraction logs:
jvm_params: $jvm_params
dynamic_configs: $dynamic_configs
logs: $logging_output
"
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
        # Start a single TaskManager
        # 启动单个 TaskManager
         echo  "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        # "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    else
        # Example output from `numactl --show` on an AWS c4.8xlarge:
        # policy: default
        # preferred node: current
        # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
        # cpubind: 0 1
        # nodebind: 0 1
        # membind: 0 1
        read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
        for NODE_ID in "${NODE_LIST[@]:1}"; do
            # Start a TaskManager for each NUMA node
            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        done
    fi
fi

五 .flink-daemon.sh

启动taskexecutor、 zookeeper、historyserver、standalonesession、standalonejob都需要的任务指令.

Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]

服务入口类备注
taskexecutororg.apache.flink.runtime.taskexecutor.TaskManagerRunnerxxx
standalonesessionorg.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypointxxx
standalonejoborg.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPointxxx
historyserverorg.apache.flink.runtime.webmonitor.history.HistoryServerxxx
zookeeperorg.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeerxxx

5.1.JobManager启动指令

${JAVA_HOME}/bin/java  
-Xmx1073741824 
-Xms1073741824 
-XX:MaxMetaspaceSize=268435456


# 日志相关
-Dlog.file=${FLINK_HOME}/log/flink-sysadmin-standalonesession-1-BoYi-Pro.local.log 
-Dlog4j.configuration=file:${FLINK_HOME}/conf/log4j.properties 
-Dlog4j.configurationFile=file:${FLINK_HOME}/conf/log4j.properties 
-Dlogback.configurationFile=file:${FLINK_HOME}/conf/logback.xml

# classpath类路径相关
-

以上是关于FlinkFlink 1.12.2 启动脚本的主要内容,如果未能解决你的问题,请参考以下文章

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

FlinkFlink 1.12.2 TaskSlotTable

flinkFlink 1.12.2 源码浅析 : StreamTask 浅析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

FlinkFlink 1.12.2 源码浅析 : TaskExecutor