Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析
Posted EdwardsWang丶
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析相关的知识,希望对你有一定的参考价值。
一、研究背景
最近公司想通过JDBC的方式使用flink sql gateway,Flink官方提供了HiveServer2的flink-sql-gateway使用方式,但是Flink官方这里使用的是HiveCatalog,是在代码里写死的不可配置的,由于业务上没有使用Hive的需求,考虑到想以最轻量化的方式使用该sql-gateway,于是选择将源码中的HiveCatalog修改为公司自研的Catalog。以下为在实现上述功能过程中对源码的分析。
二、源码分析
本次使用的Flink版本为Flink 1.17.0
2.1、寻找入口
首先根据官方文档可知,flink-sql-gateway的启动方式为执行 $FLINK_HOME/bin/sql-gateway.sh 脚本,我们来看该脚本的主要内容:
该脚本主要做了两件事:
- 读取config.sh的配置
- 执行SQL gateway的具体启动逻辑
以下是核心内容源码:
################################################################################
# SQL gateway specific logic
################################################################################
ENTRYPOINT=sql-gateway
if [[ "$1" = *--help ]] || [[ "$1" = *-h ]]; then
usage
exit 0
fi
STARTSTOP=$1
if [ -z "$STARTSTOP" ]; then
STARTSTOP="start"
fi
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
usage
exit 1
fi
# ./sql-gateway.sh start --help, print the message to the console
if [[ "$STARTSTOP" = start* ]] && ( [[ "$*" = *--help* ]] || [[ "$*" = *-h* ]] ); then
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
"$JAVA_RUN" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.gateway.SqlGateway "$@:2"
exit 0
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "$FLINK_BIN_DIR"/flink-console.sh $ENTRYPOINT "$@:2"
else
"$FLINK_BIN_DIR"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "$@:2"
fi
在上述源码中,可以看到运行Jar包指定的主类为:org.apache.flink.table.gateway.SqlGateway,我们直接去源码中寻找该类的main方法即可找到SQL gateway的入口
2.2、HiveServer2 SQL gateway 启动流程源码分析
我们进入org.apache.flink.table.gateway.SqlGateway的main()方法内:
static void startSqlGateway(PrintStream stream, String[] args)
SqlGatewayOptions cliOptions = SqlGatewayOptionsParser.parseSqlGatewayOptions(args);
if (cliOptions.isPrintHelp())
SqlGatewayOptionsParser.printHelpSqlGateway(stream);
return;
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 加载配置
DefaultContext defaultContext =
DefaultContext.load(
ConfigurationUtils.createConfiguration(cliOptions.getDynamicConfigs()),
Collections.emptyList(),
true,
true);
// 初始化SQL gateway
SqlGateway gateway =
new SqlGateway(
defaultContext.getFlinkConfig(), SessionManager.create(defaultContext));
try
Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));
// 启动
gateway.start();
gateway.waitUntilStop();
catch (Throwable t)
// User uses ctrl + c to cancel the Gateway manually
if (t instanceof InterruptedException)
LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down.");
return;
// make space in terminal
stream.println();
stream.println();
if (t instanceof SqlGatewayException)
// Exception that the gateway can not handle.
throw (SqlGatewayException) t;
else
LOG.error(
"SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
t);
throw new SqlGatewayException(
"Unexpected exception. This is a bug. Please consider filing an issue.", t);
finally
gateway.stop();
在上述代码中,主要做了几件事:
- 启动日志服务
- 加载和检查配置项
- 初始化SQL gateway并启动
我们主要来看SQL gateway的初始化和启动流程,首先是初始化,我们主要关注如下代码:
// 初始化SQL gateway
SqlGateway gateway =
new SqlGateway(
defaultContext.getFlinkConfig(), SessionManager.create(defaultContext));
SqlGateway的构造函数需要两个参数,分别是Flink的configuration和SessionManager的实例,此处SessionManager实例的构建中Flink使用的JDK1.8的特性,通过static关键字的修饰在SessionManager的接口内直接完成了SessionManagerImpl实例的构建并返回给SqlGateway的构造方法:
public interface SessionManager
/** Create the @link SessionManager with the default configuration. */
static SessionManager create(DefaultContext defaultContext)
return new SessionManagerImpl(defaultContext);
... ...
... ...
在完成实例构建后,通过gateway.start(); 方法,开始启动SQL gateway:
// 初始化SQL gateway
SqlGateway gateway =
new SqlGateway(
defaultContext.getFlinkConfig(), SessionManager.create(defaultContext));
try
Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));
// 启动
gateway.start();
gateway.waitUntilStop();
catch (Throwable t)
... ...
finally
gateway.stop();
我们点进start()方法内:
public void start() throws Exception
// 启动SessionManager
sessionManager.start();
// 初始化SqlGatewayService对象,实际做事的对象
SqlGatewayService sqlGatewayService = new SqlGatewayServiceImpl(sessionManager);
try
// 根据配置文件中配置的sql-gateway端点类型来选择是Rest Sql-gateway还是HiveServer2 Sql-gateway
endpoints.addAll(
SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
sqlGatewayService, defaultConfig));
for (SqlGatewayEndpoint endpoint : endpoints)
// 启动
endpoint.start();
catch (Throwable t)
LOG.error("Failed to start the endpoints.", t);
throw new SqlGatewayException("Failed to start the endpoints.", t);
在这里做了几件事:
- 启动sessionManager
- 初始化SqlGatewayService对象
- 加载配置文件中配置的SQL-gateway类型,并根据对应的配置创建对应的端点
- 启动端点
首先我们来看SessionManager的启动过程,点进sessionManager.start()方法:
@Override
public void start()
// 构建超时检查任务
if (checkInterval > 0 && idleTimeout > 0)
cleanupService = Executors.newSingleThreadScheduledExecutor();
timeoutCheckerFuture =
cleanupService.scheduleAtFixedRate(
() ->
LOG.debug(
"Start to cleanup expired sessions, current session count: ",
sessions.size());
for (Map.Entry<SessionHandle, Session> entry :
sessions.entrySet())
SessionHandle sessionId = entry.getKey();
Session session = entry.getValue();
if (isSessionExpired(session))
LOG.info("Session is expired, closing it...", sessionId);
closeSession(session);
LOG.debug(
"Removing expired session finished, current session count: ",
sessions.size());
,
checkInterval,
checkInterval,
TimeUnit.MILLISECONDS);
ReadableConfig conf = defaultContext.getFlinkConfig();
// 构建算子任务线程池,该线程池主要用于执行实际sql-gateway的操作
operationExecutorService =
ThreadUtils.newThreadPool(
conf.get(SQL_GATEWAY_WORKER_THREADS_MIN),
conf.get(SQL_GATEWAY_WORKER_THREADS_MAX),
conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(),
OPERATION_POOL_NAME);
在SessionManager内部,Flink启动了一个超时检查任务,并构建了一个算子任务线程池,该线程池主要用于接受并提交sql-gateway的操作。
接下来看SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint()方法:
public static List<SqlGatewayEndpoint> createSqlGatewayEndpoint(
SqlGatewayService service, Configuration configuration)
// 加载 sql-gateway.endpoint.type 配置
List<String> identifiers = configuration.get(SQL_GATEWAY_ENDPOINT_TYPE);
if (identifiers == null || identifiers.isEmpty())
throw new ValidationException(
String.format(
"Endpoint options do not contain an option key '%s' for discovering an endpoint.",
SQL_GATEWAY_ENDPOINT_TYPE.key()));
// 验证参数
validateSpecifiedEndpointsAreUnique(identifiers);
List<SqlGatewayEndpoint> endpoints = new ArrayList<>();
for (String identifier : identifiers)
// 根据字符串匹配对应的工厂类表示,Factory.factoryIdentifier() 获取字符标识
// 例如 HiveServer2EndpointFactory 的标识 IDENTIFIER 为 hiveserver2
final SqlGatewayEndpointFactory factory =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
SqlGatewayEndpointFactory.class,
identifier);
endpoints.add(
factory.createSqlGatewayEndpoint(
new DefaultEndpointFactoryContext(
service,
configuration,
getEndpointConfig(configuration, identifier))));
return endpoints;
在方法里主要是通过配置文件获取sql-gateway.endpoint.type配置项内容来获取endpoint类型,并根据该配置项内容的字符串与各个工厂类的标识进行遍历对比,来获取指定的工厂类,并创建出对应的实例。
例如我们在sql-gateway.endpoint.type中配置的为hiveServer2,则此处加载的工厂类为HiveServer2EndpointFactory,该工厂类创造出的实例为HiveServer2Endpoint。
最后Flink通过SqlGatewayEndpoint.start()方法启动对应的endpoint,此处我们调用的为HiveServer2Endpoint的start方法。
我们点进org.apache.flink.table.endpoint.hive.HiveServer2Endpoint.start()方法内部:
@Override
public void start() throws Exception
buildTThreadPoolServer();
serverThread.start();
首先,HiveServer2Endpoint实现了Runnable接口,这是一个线程,我们点开buildTThreadPoolServer方法:
private void buildTThreadPoolServer()
executor =
ThreadUtils.newThreadPool(
minWorkerThreads,
maxWorkerThreads,
workerKeepAliveTime.toMillis(),
"hiveserver2-endpoint-thread-pool");
try
server =
new TThreadPoolServer(
new TThreadPoolServer.Args(new TServerSocket(socketAddress))
.processorFactory(
new TProcessorFactory(
new TCLIService.Processor<>(this)))
.transportFactory(new TTransportFactory())
// Currently, only support binary mode.
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(
new TBinaryProtocol.Factory(
true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeoutMs)
.requestTimeoutUnit(TimeUnit.MILLISECONDS)
.beBackoffSlotLength(backOffSlotLengthMs)
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executor));
catch (Exception e)
throw new SqlGatewayException("Failed to build the server.", e);
在这里,Flink构建了一个Hive thrift服务端,准备监听端口并准备建立session,没有太多可说的,我们返回上一级,继续看HiveServer2Endpoint.start()方法中的现场启动方法。
我们来看HiveServer2Endpoint的run方法:
@Override
public void run()
try
LOG.info("HiveServer2 Endpoint begins to listen on .", socketAddress.toString());
server.serve();
catch (Throwable t)
LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t);
System.exit(-1);
在这里,启动我们刚才构建好的服务端,开始监听我们hive-site里配置的url。
2.3、HiveServer2 SQL gateway任务执行流程
因为使用了Hive 的 thrift,所以流程上和Hive thrift的调用回调流程差不多。
举个栗子,首先我们执行一段demo代码如下:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class JDBCConnect
public static void main(String[] args) throws Exception
try (
// Please replace the JDBC URI with your actual host, port and database.
Connection connection = DriverManager.getConnection("jdbc:hive2://localhost:10000/default;auth=noSasl");
Statement statement = connection.createStatement())
statement.execute("CREATE TABLE testTable4 (name STRING, age INT)");
statement.execute("SHOW TABLES");
ResultSet resultSet = statement.getResultSet();
while (resultSet.next())
System.out.println(resultSet.getString(1));
代码执行后会以JDBC的方式通过thrift建立连接,此时会触发thrift中Server的OpenSession方法,我们进入HiveServer2Endpoint中,点进OpenSession方法:
@Override
public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TException
LOG.debug("Client protocol version: .", tOpenSessionReq.getClient_protocol());
TOpenSessionResp resp = new TOpenSessionResp();
try
// negotiate connection protocol
TProtocolVersion clientProtocol = tOpenSessionReq.getClient_protocol();
// the session version is not larger than the server version because of the
// min(server_version, ...)
HiveServer2EndpointVersion sessionVersion =
HiveServer2EndpointVersion.valueOf(
TProtocolVersion.findByValue(
Math.min(
clientProtocol.getValue(),
SERVER_VERSION.getVersion().getValue())));
// prepare session environment
Map<String, String> originSessionConf =
tOpenSessionReq.getConfiguration() == null
? Collections.emptyMap()
: tOpenSessionReq.getConfiguration();
HiveConf conf = Flink学习笔记:搭建Flink on Yarn环境并运行Flink应用
文章目录
以上是关于Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析的主要内容,如果未能解决你的问题,请参考以下文章