Flink SQL Client 源码解析
Posted zhisheng_blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL Client 源码解析相关的知识,希望对你有一定的参考价值。
Abstract
本文基于 Flink 1.12-SNAPSHOT,使用sql client命令行提交insert语句进行整个流程的分析。
sql-client.sh embedded --update "INSERT INTO user_log_sink2 SELECT * FROM user_log"
Initialize the environment
主类:org.apache.flink.table.client.SqlClient#main
public static void main(String[] args)
if (args.length < 1)
CliOptionsParser.printHelpClient();
return;
switch (args[0])
case MODE_EMBEDDED:
// remove mode
final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
if (options.isPrintHelp())
CliOptionsParser.printHelpEmbeddedModeClient();
else
try
final SqlClient client = new SqlClient(true, options);
client.start();
catch (SqlClientException e)
// make space in terminal
System.out.println();
System.out.println();
LOG.error("SQL Client must stop.", e);
throw e;
catch (Throwable t)
// make space in terminal
System.out.println();
System.out.println();
LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
break;
case MODE_GATEWAY:
throw new SqlClientException("Gateway mode is not supported yet.");
default:
CliOptionsParser.printHelpClient();
首先判断参数个数,根据第一个参数选择执行模式为embedded或gateway,本次会进入embedded。
接着就是解析命令行参数。
目前支持的参数项见org.apache.flink.table.client.cli.CliOptionsParser#parseEmbeddedModeClient org.apache.flink.table.client.cli.CliOptionsParser这个类就是用于解析命令行的。
然后基于传入的参数创建SqlClient对象,调用start方法
private void start()
if (isEmbedded)
// create local executor with default environment
final List<URL> jars;
if (options.getJars() != null)
jars = options.getJars();
else
jars = Collections.emptyList();
final List<URL> libDirs;
if (options.getLibraryDirs() != null)
libDirs = options.getLibraryDirs();
else
libDirs = Collections.emptyList();
final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
executor.start();
// create CLI client with session environment
final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
appendPythonConfig(sessionEnv, options.getPythonConfiguration());
final SessionContext context;
if (options.getSessionId() == null)
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
else
context = new SessionContext(options.getSessionId(), sessionEnv);
// Open an new session
String sessionId = executor.openSession(context);
try
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));
// do the actual work
openCli(sessionId, executor);
finally
executor.closeSession(sessionId);
else
throw new SqlClientException("Gateway mode is not supported yet.");
首先是根据默认的sql-client-defaults.yaml配置文件实例化local executor,并调用该实例的start方法,但该方法中并没有做任何处理。
然后读取session environment生成SessionContext,注意这里的session environment其实就是读取的用户通过-e参数指定的配置文件
这里简单介绍下org.apache.flink.table.client.gateway.SessionContext这个类,该类描述一个会话,主要用于在后端打开一个新会话。如果客户端请求打开一个新会话,后端@link Executor将为它维护一个@link org.apache.flink.table.client.gateway.local.ExecutionContext,每次客户端交互都需要附加这个会话ID
接着会将context对象传入executor.openSession方法中获取到sessionId。
然后创建一个shutdown hook,这个hook最主要做的工作就是关闭sql client之前会杀掉已提交的查询作业,防止查询作业一直在集群上跑浪费资源。
private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId)
final DynamicResult<T> result = resultStore.getResult(resultId);
if (result == null)
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
// stop retrieval and remove the result
LOG.info("Cancelling job and result retrieval.", resultId);
result.close();
resultStore.removeResult(resultId);
// stop Flink job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor())
ClusterClient<T> clusterClient = null;
try
// retrieve existing cluster
clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
try
clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
catch (Throwable t)
// the job might has finished earlier
catch (Exception e)
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
finally
try
if (clusterClient != null)
clusterClient.close();
catch (Exception e)
// ignore
catch (SqlExecutionException e)
throw e;
catch (Exception e)
throw new SqlExecutionException("Could not locate a cluster.", e);
最后会将sessionId和LocalExecutor对象传入openCli方法,此后进入了实际的工作方法中。
Opens the CLI client for executing SQL statements.
/**
* Opens the CLI client for executing SQL statements.
*
* @param sessionId session identifier for the current client.
* @param executor executor
*/
private void openCli(String sessionId, Executor executor)
CliClient cli = null;
try
Path historyFilePath;
if (options.getHistoryFilePath() != null)
historyFilePath = Paths.get(options.getHistoryFilePath());
else
historyFilePath = Paths.get(System.getProperty("user.home"),
SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
cli = new CliClient(sessionId, executor, historyFilePath);
// interactive CLI mode
if (options.getUpdateStatement() == null)
cli.open();
// execute single update statement
else
final boolean success = cli.submitUpdate(options.getUpdateStatement());
if (!success)
throw new SqlClientException("Could not submit given SQL update statement to cluster.");
finally
if (cli != null)
cli.close();
首先判断命令行参数中是否指定了historyFilePath,如果没有显示指定,会使用当前用户的HOME路径下的.flink-sql-history作为historyFilePath
这里由于我们直接在命令行通过update参数传入将SQL语句,所以不会进入终端的交互模式,而是直接执行单个的update statement。
cli.submitUpdate(options.getUpdateStatement())
其中options.getUpdateStatement()是拿到了我们在命令中传入的SQL语句,也就是INSERT INTO user_log_sink2 SELECT * FROM user_log
Execute single update statement
执行submitUpdate方法,
/**
* Submits a SQL update statement and prints status information and/or errors on the terminal.
*
* @param statement SQL update statement
* @return flag to indicate if the submission was successful or not
*/
public boolean submitUpdate(String statement)
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
terminal.writer().println(new AttributedString(statement).toString());
terminal.flush();
final Optional<SqlCommandCall> parsedStatement = parseCommand(statement);
// only support INSERT INTO/OVERWRITE
return parsedStatement.map(cmdCall ->
switch (cmdCall.command)
case INSERT_INTO:
case INSERT_OVERWRITE:
return callInsert(cmdCall);
default:
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
return false;
).orElse(false);
首先打印了两行信息
[INFO] Executing the following statement:
INSERT INTO user_log_sink2 SELECT * FROM user_log
Parsing SQL Statement
紧接着解析传入的SQL语句
private Optional<SqlCommandCall> parseCommand(String line)
final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(executor.getSqlParser(sessionId), line);
if (!parsedLine.isPresent())
printError(CliStrings.MESSAGE_UNKNOWN_SQL);
return parsedLine;
首先从executor.getSqlParser(sessionId)拿到Parser对象
@Override
public Parser getSqlParser(String sessionId)
final ExecutionContext<?> context = getExecutionContext(sessionId);
final TableEnvironment tableEnv = context.getTableEnvironment();
final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();
return new Parser()
@Override
public List<Operation> parse(String statement)
return context.wrapClassLoader(() -> parser.parse(statement));
@Override
public UnresolvedIdentifier parseIdentifier(String identifier)
return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
;
将Parse对象和SQL语句传入SqlCommandParser.parse方法,org.apache.flink.table.client.cli.SqlCommandParser是一个用于确定命令类型及其参数的简单解析器。
public static Optional<SqlCommandCall> parse(Parser sqlParser, String stmt)
// normalize
stmt = stmt.trim();
// remove ';' at the end
if (stmt.endsWith(";"))
stmt = stmt.substring(0, stmt.length() - 1).trim();
// parse statement via sql parser first
Optional<SqlCommandCall> callOpt = parseBySqlParser(sqlParser, stmt);
if (callOpt.isPresent())
return callOpt;
else
return parseByRegexMatching(stmt);
private static Optional<SqlCommandCall> parseBySqlParser(Parser sqlParser, String stmt)
List<Operation> operations;
try
operations = sqlParser.parse(stmt);
catch (Throwable e)
if (e instanceof ValidationException)
// can be parsed via sql parser, but is not validated.
// throw exception directly
throw new SqlExecutionException("Invalidate SQL statement.", e);
return Optional.empty();
if (operations.size() != 1)
throw new SqlExecutionException("Only single statement is supported now.");
final SqlCommand cmd;
String[] operands = new String[] stmt ;
Operation operation = operations.get(0);
if (operation instanceof CatalogSinkModifyOperation)
boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
else if (operation instanceof CreateTableOperation)
cmd = SqlCommand.CREATE_TABLE;
else if (operation instanceof DropTableOperation)
cmd = SqlCommand.DROP_TABLE;
else if (operation instanceof AlterTableOperation)
cmd = SqlCommand.ALTER_TABLE;
else if (operation instanceof CreateViewOperation)
cmd = SqlCommand.CREATE_VIEW;
CreateViewOperation op = (CreateViewOperation) operation;
operands = new String[] op.getViewIdentifier().asSerializableString(),
op.getCatalogView().getOriginalQuery() ;
else if (operation instanceof DropViewOperation)
cmd = SqlCommand.DROP_VIEW;
operands = new String[] ((DropViewOperation) operation).getViewIdentifier().asSerializableString() ;
else if (operation instanceof CreateDatabaseOperation)
cmd = SqlCommand.CREATE_DATABASE;
else if (operation instanceof DropDatabaseOperation)
cmd = SqlCommand.DROP_DATABASE;
else if (operation instanceof AlterDatabaseOperation)
cmd = SqlCommand.ALTER_DATABASE;
else if (operation instanceof CreateCatalogOperation)
cmd = SqlCommand.CREATE_CATALOG;
else if (operation instanceof DropCatalogOperation)
cmd = SqlCommand.DROP_CATALOG;
else if (operation instanceof UseCatalogOperation)
cmd = SqlCommand.USE_CATALOG;
operands = new String[] ((UseCatalogOperation) operation).getCatalogName() ;
else if (operation instanceof UseDatabaseOperation)
cmd = SqlCommand.USE;
operands = new String[] ((UseDatabaseOperation) operation).getDatabaseName() ;
else if (operation instanceof ShowCatalogsOperation)
cmd = SqlCommand.SHOW_CATALOGS;
operands = new String[0];
else if (operation instanceof ShowDatabasesOperation)
cmd = SqlCommand.SHOW_DATABASES;
operands = new String[0];
else if (operation instanceof ShowTablesOperation)
cmd = SqlCommand.SHOW_TABLES;
operands = new String[0];
else if (operation instanceof ShowFunctionsOperation)
cmd = SqlCommand.SHOW_FUNCTIONS;
operands = new String[0];
else if (operation instanceof CreateCatalogFunctionOperation ||
operation instanceof CreateTempSystemFunctionOperation)
cmd = SqlCommand.CREATE_FUNCTION;
else if (operation instanceof DropCatalogFunctionOperation ||
operation instanceof DropTempSystemFunctionOperation)
cmd = SqlCommand.DROP_FUNCTION;
else if (operation instanceof AlterCatalogFunctionOperation)
cmd = SqlCommand.ALTER_FUNCTION;
else if (operation instanceof ExplainOperation)
cmd = SqlCommand.EXPLAIN;
else if (operation instanceof DescribeTableOperation)
cmd = SqlCommand.DESCRIBE;
operands = new String[] ((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString() ;
else if (operation instanceof QueryOperation)
cmd = SqlCommand.SELECT;
else
cmd = null;
return cmd == null ? Optional.empty() : Optional.of(new SqlCommandCall(cmd, operands));
最终返回到 org.apache.flink.table.client.cli.CliClient#submitUpdate
方法体中的调用处 final Optional<SqlCommandCall> parsedStatement = parseCommand(statement)
然后执行
// only support INSERT INTO/OVERWRITE
return parsedStatement.map(cmdCall ->
switch (cmdCall.command)
case INSERT_INTO:
case INSERT_OVERWRITE:
return callInsert(cmdCall);
default:
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
return false;
).orElse(false);
Call Insert Method
进入callInsert方法
private boolean callInsert(SqlCommandCall cmdCall)
printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
try
final ProgramTargetDescriptor programTarget = executor.executeUpdate(sessionId, cmdCall.operands[0]);
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(programTarget.toString());
terminal.flush();
catch (SqlExecutionException e)
printExecutionException(e);
return false;
return true;
首先会在终端打印一行信息
[INFO] Submitting SQL update statement to the cluster...
接着执行 executor.executeUpdate(sessionId, cmdCall.operands[0])
方法
@Override
public ProgramTargetDescriptor executeUpdate(String sessionId, String statement) throws SqlExecutionException
final ExecutionContext<?> context = getExecutionContext(sessionId);
return executeUpdateInternal(sessionId, context, statement);
进入executeUpdateInternal方法
private <C> ProgramTargetDescriptor executeUpdateInternal(
String sessionId,
ExecutionContext<C> context,
String statement)
applyUpdate(context, statement);
//Todo: we should refactor following condition after TableEnvironment has support submit job directly.
if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches())
return null;
// create pipeline
final String jobName = sessionId + ": " + statement;
final Pipeline pipeline;
try
pipeline = context.createPipeline(jobName);
catch (Throwable t)
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
// create a copy so that we can change settings without affecting the original config
Configuration configuration = new Configuration(context.getFlinkConfig());
// for update queries we don't wait for the job result, so run in detached mode
configuration.set(DeploymentOptions.ATTACHED, false);
// create execution
final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
// blocking deployment
try
JobClient jobClient = deployer.deploy().get();
return ProgramTargetDescriptor.of(jobClient.getJobID());
catch (Exception e)
throw new RuntimeException("Error running SQL job.", e);
Buffer List
首先进入applyUpdate(context, statement)方法
/**
* Applies the given update statement to the given table environment with query configuration.
*/
private <C> void applyUpdate(ExecutionContext<C> context, String updateStatement)
final TableEnvironment tableEnv = context.getTableEnvironment();
try
// TODO replace sqlUpdate with executeSql
// This needs we do more refactor, because we can't set the flinkConfig in ExecutionContext
// into StreamExecutionEnvironment
context.wrapClassLoader(() -> tableEnv.sqlUpdate(updateStatement));
catch (Throwable t)
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL update statement.", t);
进入tableEnv.sqlUpdate(updateStatement)方法.
@Override
public void sqlUpdate(String stmt)
List<Operation> operations = parser.parse(stmt);
if (operations.size() != 1)
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation)
buffer(Collections.singletonList((ModifyOperation) operation));
else if (operation instanceof CreateTableOperation ||
operation instanceof DropTableOperation ||
operation instanceof AlterTableOperation ||
operation instanceof CreateViewOperation ||
operation instanceof DropViewOperation ||
operation instanceof CreateDatabaseOperation ||
operation instanceof DropDatabaseOperation ||
operation instanceof AlterDatabaseOperation ||
operation instanceof CreateCatalogFunctionOperation ||
operation instanceof CreateTempSystemFunctionOperation ||
operation instanceof DropCatalogFunctionOperation ||
operation instanceof DropTempSystemFunctionOperation ||
operation instanceof AlterCatalogFunctionOperation ||
operation instanceof CreateCatalogOperation ||
operation instanceof DropCatalogOperation ||
operation instanceof UseCatalogOperation ||
operation instanceof UseDatabaseOperation)
executeOperation(operation);
else
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
parse(stmt)方法最终返回了Collections.singletonList(operation)
@Override
public List<Operation> parse(String statement)
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
buffer(Collections.singletonList((ModifyOperation) operation))将操作加到了一块
private void buffer(List<ModifyOperation> modifyOperations)
bufferedModifyOperations.addAll(modifyOperations);
Create pipeline and blocking deployment
返回到org.apache.flink.table.client.gateway.local.LocalExecutor#executeUpdateInternal
//Todo: we should refactor following condition after TableEnvironment has support submit job directly.
if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches())
return null;
// create pipeline
final String jobName = sessionId + ": " + statement;
final Pipeline pipeline;
try
pipeline = context.createPipeline(jobName);
catch (Throwable t)
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
// create a copy so that we can change settings without affecting the original config
Configuration configuration = new Configuration(context.getFlinkConfig());
// for update queries we don't wait for the job result, so run in detached mode
configuration.set(DeploymentOptions.ATTACHED, false);
// create execution
final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
// blocking deployment
try
JobClient jobClient = deployer.deploy().get();
return ProgramTargetDescriptor.of(jobClient.getJobID());
catch (Exception e)
throw new RuntimeException("Error running SQL job.", e);
context.createPipeline(jobName)
public Pipeline createPipeline(String name)
return wrapClassLoader(() ->
if (streamExecEnv != null)
StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl) tableEnv;
return streamTableEnv.getPipeline(name);
else
BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl) tableEnv;
return batchTableEnv.getPipeline(name);
);
分离模式提交
configuration.set(DeploymentOptions.ATTACHED, false);
// create execution
final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
org.apache.flink.table.client.gateway.local.ProgramDeployer用于在集群上部署一个表程序。
异步提交Flink Job
public CompletableFuture<JobClient> deploy()
LOG.info("Submitting job for query `", pipeline, jobName);
if (LOG.isDebugEnabled())
LOG.debug("Submitting job with configuration: \\n", pipeline, configuration);
if (configuration.get(DeploymentOptions.TARGET) == null)
throw new RuntimeException("No execution.target specified in your configuration file.");
PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
final PipelineExecutorFactory executorFactory;
try
executorFactory = executorServiceLoader.getExecutorFactory(configuration);
catch (Exception e)
throw new RuntimeException("Could not retrieve ExecutorFactory.", e);
final PipelineExecutor executor = executorFactory.getExecutor(configuration);
CompletableFuture<JobClient> jobClient;
try
jobClient = executor.execute(pipeline, configuration);
catch (Exception e)
throw new RuntimeException("Could not execute program.", e);
return jobClient;
地址:https://github.com/y0908105023/wiki/wiki/Flink-Sql-Client-%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90
作者:y0908105023
end
Flink 从入门到精通 系列文章
基于 Apache Flink 的实时监控告警系统
关于数据中台的深度思考与总结(干干货)
日志收集Agent,阴暗潮湿的地底世界
公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug 👇
以上是关于Flink SQL Client 源码解析的主要内容,如果未能解决你的问题,请参考以下文章
FlinkSQL-- sql-client及源码解析 -- flink-1.13.6
Flink从入门到精通100篇(二十四)-对Flink SQL Client 源码做深度解析
Flink 1.13.0 sql-client 新特性及源码分析
flink sql client 连接kafka解析avro数据 (avro ArrayIndexOutOfBoundsException 解决办法)