Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析

Posted EdwardsWang丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析相关的知识,希望对你有一定的参考价值。

相关内容:Flink 1.13 源码解析 目录汇总

一、研究背景

最近公司想通过JDBC的方式使用flink sql gateway,Flink官方提供了HiveServer2的flink-sql-gateway使用方式,但是Flink官方这里使用的是HiveCatalog,是在代码里写死的不可配置的,由于业务上没有使用Hive的需求,考虑到想以最轻量化的方式使用该sql-gateway,于是选择将源码中的HiveCatalog修改为公司自研的Catalog。以下为在实现上述功能过程中对源码的分析。

二、源码分析

本次使用的Flink版本为Flink 1.17.0

2.1、寻找入口

首先根据官方文档可知,flink-sql-gateway的启动方式为执行 $FLINK_HOME/bin/sql-gateway.sh 脚本,我们来看该脚本的主要内容:
该脚本主要做了两件事:

  • 读取config.sh的配置
  • 执行SQL gateway的具体启动逻辑

以下是核心内容源码:

################################################################################
# SQL gateway specific logic
################################################################################

ENTRYPOINT=sql-gateway

if [[ "$1" = *--help ]] || [[ "$1" = *-h ]]; then
usage
exit 0
fi

STARTSTOP=$1

if [ -z "$STARTSTOP" ]; then
STARTSTOP="start"
fi

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
usage
exit 1
fi

# ./sql-gateway.sh start --help, print the message to the console
if [[ "$STARTSTOP" = start* ]] && ( [[ "$*" = *--help* ]] || [[ "$*" = *-h* ]] ); then
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
"$JAVA_RUN"  -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.gateway.SqlGateway "$@:2"
exit 0
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
exec "$FLINK_BIN_DIR"/flink-console.sh $ENTRYPOINT "$@:2"
else
"$FLINK_BIN_DIR"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "$@:2"
fi

在上述源码中,可以看到运行Jar包指定的主类为:org.apache.flink.table.gateway.SqlGateway,我们直接去源码中寻找该类的main方法即可找到SQL gateway的入口

2.2、HiveServer2 SQL gateway 启动流程源码分析

我们进入org.apache.flink.table.gateway.SqlGateway的main()方法内:

static void startSqlGateway(PrintStream stream, String[] args) 
    SqlGatewayOptions cliOptions = SqlGatewayOptionsParser.parseSqlGatewayOptions(args);

    if (cliOptions.isPrintHelp()) 
        SqlGatewayOptionsParser.printHelpSqlGateway(stream);
        return;
    

    // startup checks and logging
    EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
    SignalHandler.register(LOG);
    JvmShutdownSafeguard.installAsShutdownHook(LOG);

    // 加载配置
    DefaultContext defaultContext =
        DefaultContext.load(
            ConfigurationUtils.createConfiguration(cliOptions.getDynamicConfigs()),
            Collections.emptyList(),
            true,
            true);

    // 初始化SQL gateway
    SqlGateway gateway =
        new SqlGateway(
            defaultContext.getFlinkConfig(), SessionManager.create(defaultContext));
    try 
        Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));

        // 启动
        gateway.start();
        gateway.waitUntilStop();
     catch (Throwable t) 
        // User uses ctrl + c to cancel the Gateway manually
        if (t instanceof InterruptedException) 
            LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down.");
            return;
        
        // make space in terminal
        stream.println();
        stream.println();

        if (t instanceof SqlGatewayException) 
            // Exception that the gateway can not handle.
            throw (SqlGatewayException) t;
         else 
            LOG.error(
                "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
                t);
            throw new SqlGatewayException(
                "Unexpected exception. This is a bug. Please consider filing an issue.", t);
        
     finally 
        gateway.stop();
    

在上述代码中,主要做了几件事:

  • 启动日志服务
  • 加载和检查配置项
  • 初始化SQL gateway并启动

我们主要来看SQL gateway的初始化和启动流程,首先是初始化,我们主要关注如下代码:

// 初始化SQL gateway
SqlGateway gateway =
        new SqlGateway(
                defaultContext.getFlinkConfig(), SessionManager.create(defaultContext));

SqlGateway的构造函数需要两个参数,分别是Flink的configuration和SessionManager的实例,此处SessionManager实例的构建中Flink使用的JDK1.8的特性,通过static关键字的修饰在SessionManager的接口内直接完成了SessionManagerImpl实例的构建并返回给SqlGateway的构造方法:

public interface SessionManager 

    /** Create the @link SessionManager with the default configuration. */
    static SessionManager create(DefaultContext defaultContext) 
        return new SessionManagerImpl(defaultContext);
    

    ... ...
    ... ...

在完成实例构建后,通过gateway.start(); 方法,开始启动SQL gateway:

// 初始化SQL gateway
SqlGateway gateway =
        new SqlGateway(
                defaultContext.getFlinkConfig(), SessionManager.create(defaultContext));
try 
    Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));
    // 启动
    gateway.start();
    gateway.waitUntilStop();
 catch (Throwable t) 
    ... ...
 finally 
    gateway.stop();

我们点进start()方法内:

public void start() throws Exception 
    // 启动SessionManager
    sessionManager.start();

    // 初始化SqlGatewayService对象,实际做事的对象
    SqlGatewayService sqlGatewayService = new SqlGatewayServiceImpl(sessionManager);
    try 
        // 根据配置文件中配置的sql-gateway端点类型来选择是Rest Sql-gateway还是HiveServer2 Sql-gateway
        endpoints.addAll(
                SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
                        sqlGatewayService, defaultConfig));
        for (SqlGatewayEndpoint endpoint : endpoints) 
            // 启动
            endpoint.start();
        
     catch (Throwable t) 
        LOG.error("Failed to start the endpoints.", t);
        throw new SqlGatewayException("Failed to start the endpoints.", t);
    

在这里做了几件事:

  • 启动sessionManager
  • 初始化SqlGatewayService对象
  • 加载配置文件中配置的SQL-gateway类型,并根据对应的配置创建对应的端点
  • 启动端点

首先我们来看SessionManager的启动过程,点进sessionManager.start()方法:

@Override
public void start() 
    // 构建超时检查任务
    if (checkInterval > 0 && idleTimeout > 0) 
        cleanupService = Executors.newSingleThreadScheduledExecutor();
        timeoutCheckerFuture =
                cleanupService.scheduleAtFixedRate(
                        () -> 
                            LOG.debug(
                                    "Start to cleanup expired sessions, current session count: ",
                                    sessions.size());
                            for (Map.Entry<SessionHandle, Session> entry :
                                    sessions.entrySet()) 
                                SessionHandle sessionId = entry.getKey();
                                Session session = entry.getValue();
                                if (isSessionExpired(session)) 
                                    LOG.info("Session  is expired, closing it...", sessionId);
                                    closeSession(session);
                                
                            
                            LOG.debug(
                                    "Removing expired session finished, current session count: ",
                                    sessions.size());
                        ,
                        checkInterval,
                        checkInterval,
                        TimeUnit.MILLISECONDS);
    

    ReadableConfig conf = defaultContext.getFlinkConfig();
    
    // 构建算子任务线程池,该线程池主要用于执行实际sql-gateway的操作
    operationExecutorService =
            ThreadUtils.newThreadPool(
                    conf.get(SQL_GATEWAY_WORKER_THREADS_MIN),
                    conf.get(SQL_GATEWAY_WORKER_THREADS_MAX),
                    conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(),
                    OPERATION_POOL_NAME);

在SessionManager内部,Flink启动了一个超时检查任务,并构建了一个算子任务线程池,该线程池主要用于接受并提交sql-gateway的操作。

接下来看SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint()方法:

public static List<SqlGatewayEndpoint> createSqlGatewayEndpoint(
        SqlGatewayService service, Configuration configuration) 
    // 加载 sql-gateway.endpoint.type 配置
    List<String> identifiers = configuration.get(SQL_GATEWAY_ENDPOINT_TYPE);
    
    if (identifiers == null || identifiers.isEmpty()) 
        throw new ValidationException(
                String.format(
                        "Endpoint options do not contain an option key '%s' for discovering an endpoint.",
                        SQL_GATEWAY_ENDPOINT_TYPE.key()));
    
    // 验证参数
    validateSpecifiedEndpointsAreUnique(identifiers);
    
    List<SqlGatewayEndpoint> endpoints = new ArrayList<>();
    for (String identifier : identifiers) 
        // 根据字符串匹配对应的工厂类表示,Factory.factoryIdentifier() 获取字符标识
        // 例如 HiveServer2EndpointFactory 的标识 IDENTIFIER 为 hiveserver2
        final SqlGatewayEndpointFactory factory =
                FactoryUtil.discoverFactory(
                        Thread.currentThread().getContextClassLoader(),
                        SqlGatewayEndpointFactory.class,
                        identifier);
        
        endpoints.add(
                factory.createSqlGatewayEndpoint(
                        new DefaultEndpointFactoryContext(
                                service,
                                configuration,
                                getEndpointConfig(configuration, identifier))));
    
    return endpoints;

在方法里主要是通过配置文件获取sql-gateway.endpoint.type配置项内容来获取endpoint类型,并根据该配置项内容的字符串与各个工厂类的标识进行遍历对比,来获取指定的工厂类,并创建出对应的实例。
例如我们在sql-gateway.endpoint.type中配置的为hiveServer2,则此处加载的工厂类为HiveServer2EndpointFactory,该工厂类创造出的实例为HiveServer2Endpoint。

最后Flink通过SqlGatewayEndpoint.start()方法启动对应的endpoint,此处我们调用的为HiveServer2Endpoint的start方法。
我们点进org.apache.flink.table.endpoint.hive.HiveServer2Endpoint.start()方法内部:

@Override
public void start() throws Exception 
    buildTThreadPoolServer();
    serverThread.start();

首先,HiveServer2Endpoint实现了Runnable接口,这是一个线程,我们点开buildTThreadPoolServer方法:

private void buildTThreadPoolServer() 
    executor =
            ThreadUtils.newThreadPool(
                    minWorkerThreads,
                    maxWorkerThreads,
                    workerKeepAliveTime.toMillis(),
                    "hiveserver2-endpoint-thread-pool");
    try 
        server =
                new TThreadPoolServer(
                        new TThreadPoolServer.Args(new TServerSocket(socketAddress))
                                .processorFactory(
                                        new TProcessorFactory(
                                                new TCLIService.Processor<>(this)))
                                .transportFactory(new TTransportFactory())
                                // Currently, only support binary mode.
                                .protocolFactory(new TBinaryProtocol.Factory())
                                .inputProtocolFactory(
                                        new TBinaryProtocol.Factory(
                                                true, true, maxMessageSize, maxMessageSize))
                                .requestTimeout(requestTimeoutMs)
                                .requestTimeoutUnit(TimeUnit.MILLISECONDS)
                                .beBackoffSlotLength(backOffSlotLengthMs)
                                .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
                                .executorService(executor));
     catch (Exception e) 
        throw new SqlGatewayException("Failed to build the server.", e);
    

在这里,Flink构建了一个Hive thrift服务端,准备监听端口并准备建立session,没有太多可说的,我们返回上一级,继续看HiveServer2Endpoint.start()方法中的现场启动方法。
我们来看HiveServer2Endpoint的run方法:

@Override
public void run() 
    try 
        LOG.info("HiveServer2 Endpoint begins to listen on .", socketAddress.toString());
        server.serve();
     catch (Throwable t) 
        LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t);
        System.exit(-1);
    

在这里,启动我们刚才构建好的服务端,开始监听我们hive-site里配置的url。

2.3、HiveServer2 SQL gateway任务执行流程

因为使用了Hive 的 thrift,所以流程上和Hive thrift的调用回调流程差不多。
举个栗子,首先我们执行一段demo代码如下:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class JDBCConnect 
    public static void main(String[] args) throws Exception
        try (
            // Please replace the JDBC URI with your actual host, port and database.
            Connection connection = DriverManager.getConnection("jdbc:hive2://localhost:10000/default;auth=noSasl");
            Statement statement = connection.createStatement()) 
            statement.execute("CREATE TABLE testTable4 (name STRING, age INT)");

            statement.execute("SHOW TABLES");
            ResultSet resultSet = statement.getResultSet();
            while (resultSet.next()) 
                System.out.println(resultSet.getString(1));
            
        
    

代码执行后会以JDBC的方式通过thrift建立连接,此时会触发thrift中Server的OpenSession方法,我们进入HiveServer2Endpoint中,点进OpenSession方法:

@Override
public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TException 
    LOG.debug("Client protocol version: .", tOpenSessionReq.getClient_protocol());
    TOpenSessionResp resp = new TOpenSessionResp();
    try 
        // negotiate connection protocol
        TProtocolVersion clientProtocol = tOpenSessionReq.getClient_protocol();
        // the session version is not larger than the server version because of the
        // min(server_version, ...)
        HiveServer2EndpointVersion sessionVersion =
                HiveServer2EndpointVersion.valueOf(
                        TProtocolVersion.findByValue(
                                Math.min(
                                        clientProtocol.getValue(),
                                        SERVER_VERSION.getVersion().getValue())));
        // prepare session environment
        Map<String, String> originSessionConf =
                tOpenSessionReq.getConfiguration() == null
                        ? Collections.emptyMap()
                        : tOpenSessionReq.getConfiguration();
        HiveConf conf = 

Flink学习笔记:搭建Flink on Yarn环境并运行Flink应用

文章目录

以上是关于Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析的主要内容,如果未能解决你的问题,请参考以下文章

1.17

错误:“GraphQL 错误:无效版本:1.17”,同时部署到棱镜云 [关闭]

contest 1.17

KeepAlived1.1.17安装及配置说明

php mongdb driver 1.17

Week of 1.17