Impala 源码分析-FE Posted 2020-08-24 邱明成
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Impala 源码分析-FE相关的知识,希望对你有一定的参考价值。
Impala 源代码目录结构
SQL 解析
Impala 的 SQL 解析与执行计划生成部分是由 impala-frontend(Java)实现的,监听端口是 21000。用户通过 Beeswax 接口 BeeswaxService.query() 提交一个请求,在 impalad 端的处理逻辑是由 void ImpalaServer::query(QueryHandle& query_handle, const Query& query) 这个函数(ImpalaServer.h )完成的。
1
void
ImpalaServer::query(QueryHandle& query_handle,
const
Query& query) {
2
VLOG_QUERY <<
"query(): query="
<< query.query;
3
ScopedSessionState session_handle(
this
);
4
shared_ptr<SessionState> session;
6
session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
7
SQLSTATE_GENERAL_ERROR);
11
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
15
shared_ptr<QueryExecState> exec_state;
18
RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
19
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
21
exec_state->UpdateQueryState(QueryState::RUNNING);
24
exec_state->WaitAsync();
27
Status status = SetQueryInflight(session, exec_state);
29
UnregisterQuery(exec_state->query_id(),
false
, &status);
30
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
32
TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
其中 QueryToTQueryContext(query, &query_ctx) 将 Query 装换为 TQueryCtx。具体代码实现如下: (ImpalaServer.h )
1
Status ImpalaServer::QueryToTQueryContext(
const
Query& query,
2
TQueryCtx* query_ctx) {
3
query_ctx->request.stmt = query.query;
4
VLOG_QUERY <<
"query: "
<< ThriftDebugString(query);
6
shared_ptr<SessionState> session;
7
const
TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
8
RETURN_IF_ERROR(GetSessionState(session_id, &session));
9
DCHECK(session != NULL);
14
lock_guard<mutex> l(session->lock);
15
if
(session->connected_user.empty()) session->connected_user = query.hadoop_user;
16
query_ctx->request.query_options = session->default_query_options;
19
session->ToThrift(session_id, &query_ctx->session);
23
if
(query.__isset.configuration) {
24
BOOST_FOREACH(
const
string& option, query.configuration) {
25
RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options));
27
VLOG_QUERY <<
"TClientRequest.queryOptions: "
28
<< ThriftDebugString(query_ctx->request.query_options);
内部调用 ImpalaServer::Execute() (ImpalaServer.h ) 函数将 TQueryCtx 转换为 TExecRequest,具体逻辑通过调用 ImpalaServer::ExecuteInternal() 实现。代码如下:
1
Status ImpalaServer::Execute(TQueryCtx* query_ctx,
2
shared_ptr<SessionState> session_state,
3
shared_ptr<QueryExecState>* exec_state) {
4
PrepareQueryContext(query_ctx);
5
bool registered_exec_state;
6
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
9
string stmt = replace_all_copy(query_ctx->request.stmt,
"\n"
,
" "
);
11
query_ctx->request.__set_redacted_stmt((
const
string) stmt);
13
Status status = ExecuteInternal(*query_ctx, session_state, ®istered_exec_state,
15
if
(!status.ok() && registered_exec_state) {
16
UnregisterQuery((*exec_state)->query_id(),
false
, &status);
上面的函数调用 ImpalaServer::ExecuteInternal() (ImpalaServer.h ) 在这个函数里通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest,具体代码如下:
1
Status ImpalaServer::ExecuteInternal(
2
const
TQueryCtx& query_ctx,
3
shared_ptr<SessionState> session_state,
4
bool* registered_exec_state,
5
shared_ptr<QueryExecState>* exec_state) {
6
DCHECK(session_state != NULL);
7
*registered_exec_state =
false
;
9
return
Status(
"This Impala server is offline. Please retry your query later."
);
11
exec_state->reset(
new
QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
12
this
, session_state));
14
(*exec_state)->query_events()->MarkEvent(
"Start execution"
);
29
lock_guard<mutex> l(*(*exec_state)->lock());
34
RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
35
*registered_exec_state =
true
;
37
RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
39
exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
40
(*exec_state)->query_events()->MarkEvent(
"Planning finished"
);
41
(*exec_state)->summary_profile()->AddEventSequence(
42
result.timeline.name, result.timeline);
43
if
(result.__isset.result_set_metadata) {
44
(*exec_state)->set_result_metadata(result.result_set_metadata);
47
VLOG(
2
) <<
"Execution request: "
<< ThriftDebugString(result);
50
RETURN_IF_ERROR((*exec_state)->Exec(&result));
51
if
(result.stmt_type == TStmtType::DDL) {
52
Status status = UpdateCatalogMetrics();
54
VLOG_QUERY <<
"Couldn‘t update catalog metrics: "
<< status.GetDetail();
58
if
((*exec_state)->coord() != NULL) {
59
const
unordered_set<TNetworkAddress>& unique_hosts =
60
(*exec_state)->schedule()->unique_hosts();
61
if
(!unique_hosts.empty()) {
62
lock_guard<mutex> l(query_locations_lock_);
63
BOOST_FOREACH(
const
TNetworkAddress& port, unique_hosts) {
64
query_locations_[port].insert((*exec_state)->query_id());
Frontend::GetExecRequest() (Frontend.h ) 通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest。具体实现代码如下:
1
Status Frontend::GetExecRequest(
2
const
TQueryCtx& query_ctx, TExecRequest* result) {
3
return
JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);
JniUtil::CallJniMethod() (jni-util.h ) 的具体实现代码如下:
4
static
Status CallJniMethod(
const
jobject& obj,
const
jmethodID& method,
const
T& arg) {
5
JNIEnv* jni_env = getJNIEnv();
6
jbyteArray request_bytes;
7
JniLocalFrame jni_frame;
8
RETURN_IF_ERROR(jni_frame.push(jni_env));
9
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
10
jni_env->CallObjectMethod(obj, method, request_bytes);
11
RETURN_ERROR_IF_EXC(jni_env);
至此,将通过 Thrift 转到 Java Frontend 生成执行计划树。 public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString) (Frontend.java ) 是最重要的方法,它根据提供的 TQueryCtx 创建 TExecRequest。具体代码(分析部分)如下:
2
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
4
public
TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
5
throws
ImpalaException {
7
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
8
EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
9
timeline.markEvent(
"Analysis finished"
);
首先通过调用 analyzeStmt() (Frontend.java ) 方法分析提交的 SQL 语句。analyzeStmt() 的具体实现代码如下:
2
* Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
4
private
AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
5
throws
AnalysisException, InternalException, AuthorizationException {
6
AnalysisContext analysisCtx =
new
AnalysisContext(dsqldCatalog_, queryCtx,
8
LOG.debug(
"analyze query "
+ queryCtx.request.stmt);
18
analysisCtx.analyze(queryCtx.request.stmt);
19
Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
20
return
analysisCtx.getAnalysisResult();
21
}
catch
(AnalysisException e) {
22
Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
24
if
(missingTbls.isEmpty())
throw
e;
27
if
(!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) {
28
LOG.info(String.format(
"Missing tables were not received in %dms. Load "
+
29
"request will be retried."
, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
37
analysisCtx.getAnalyzer().authorize(getAuthzChecker());
AnalyzerContext.AnalyzeResult.Analyzer 对象是个存放这个 SQL 所涉及到的所有信息 (包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知识库 ,所有跟这个 SQL 有关的东西都会存到 Analyzer对象里面。该类的定义可以查看Analyzer.java AnalyzerContex.analyze() (AnalyzeContext.java ) 的具体实现代码如下:
2
* Parse and analyze ‘stmt‘. If ‘stmt‘ is a nested query (i.e. query that
3
* contains subqueries), it is also rewritten by performing subquery unnesting.
4
* The transformed stmt is then re-analyzed in a new analysis context.
6
public
void
analyze(String stmt)
throws
AnalysisException {
7
Analyzer analyzer =
new
Analyzer(catalog_, queryCtx_, authzConfig_);
8
analyze(stmt, analyzer);
上面的 analyze() 函数通过调用同名的重载函数 analyze(String stmt, Analyzer analyzer) (AnalyzeContext.java ) 实现具体的分析,代码如下:
2
* Parse and analyze ‘stmt‘ using a specified Analyzer.
4
public
void
analyze(String stmt, Analyzer analyzer)
throws
AnalysisException {
5
SqlScanner input =
new
SqlScanner(
new
StringReader(stmt));
6
SqlParser parser =
new
SqlParser(input);
8
analysisResult_ =
new
AnalysisResult();
9
analysisResult_.analyzer_ = analyzer;
10
if
(analysisResult_.analyzer_ ==
null
) {
11
analysisResult_.analyzer_ =
new
Analyzer(catalog_, queryCtx_, authzConfig_);
13
analysisResult_.stmt_ = (StatementBase) parser.parse().value;
14
if
(analysisResult_.stmt_ ==
null
)
19
if
(analysisResult_.stmt_
instanceof
CreateTableAsSelectStmt) {
20
analysisResult_.tmpCreateTableStmt_ =
21
((CreateTableAsSelectStmt) analysisResult_.stmt_).getCreateStmt().clone();
24
analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
25
boolean
isExplain = analysisResult_.isExplainStmt();
28
if
(analysisResult_.requiresRewrite()) {
29
StatementBase rewrittenStmt = StmtRewriter.rewrite(analysisResult_);
31
Preconditions.checkNotNull(rewrittenStmt);
32
analysisResult_ =
new
AnalysisResult();
33
analysisResult_.analyzer_ =
new
Analyzer(catalog_, queryCtx_, authzConfig_);
34
analysisResult_.stmt_ = rewrittenStmt;
35
analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
36
LOG.trace(
"rewrittenStmt: "
+ rewrittenStmt.toSql());
38
analysisResult_.stmt_.setIsExplain();
40
}
catch
(AnalysisException e) {
43
}
catch
(Exception e) {
44
throw
new
AnalysisException(parser.getErrorMsg(stmt), e);
上面的函数通过调用 SqlScanner 和 SqlParser 类实现具体的分析。可以查看sql-scanner.flex 和sql-parser.y
分析 SQL 语句的大概流程如下:
处理这个 SQL 所涉及到的 Table(即TableRefs),这些 Table 是在 from 从句中提取出来的(包含关键字 from, join, on/using)。注意 JOIN 操作以及 on/using 条件是存储在参与 JOIN 操作的右边的表的 TableRef 中并分析的。依次 analyze() 每个 TableRef,向 Analyzer 注册 registerBaseTableRef(填充TupleDescriptor)。 如果对应的 TableRef 涉及到 JOIN 操作,还要 analyzeJoin()。在 analyzeJoin() 时会向 Analyzer registerConjunct() 填充 Analyzer 的一些成员变量:conjuncts,tuplePredicates(TupleId 与 conjunct 的映射),slotPredicates(SlotId 与 conjunct 的映射),eqJoinConjuncts。
处理 select 从句(包含关键字 select, MAX(), AVG()等聚集函数):分析这个 SQL 都 select 了哪几项,每一项都是个 Expr 类型的子类对象,把这几项填入 resultExprs 数组和 colLabels。然后把 resultExprs 里面的 Expr 都递归 analyze 一下,要分析到树的最底层,向 Analyzer 注册 SlotRef 等。
分析 where 从句(关键字 where),首先递归 Analyze 从句中 Expr 组成的树,然后向 Analyzer registerConjunct() 填充 Analyzer 的一些成员变量(同1,此外还要填充 whereClauseConjuncts) 。
处理 sort 相关信息(关键字 order by)。先是解析 aliases 和 ordinals,然后从 order by 后面的从句中提取 Expr 填入 orderingExprs,接着递归 Analyze 从句中 Expr 组成的树,最后创建 SortInfo 对象。
处理 aggregation 相关信息(关键字 group by, having, avg, max 等)。首先递归分析 group by 从句里的 Expr,然后如果有 having 从句就像 where 从句一样,先是 analyze having 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()等。
处理 InlineView。
至此,词法分析和语法分析都完成了,回到 frontend.createExecRequest() (Frontend.java ) 函数,开始填充 TExecRequest 内的成员变量。代码如下(部分):
2
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
4
public
TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
5
throws
ImpalaException {
7
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
8
EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
9
timeline.markEvent(
"Analysis finished"
);
12
Preconditions.checkNotNull(analysisResult.getStmt());
13
TExecRequest result =
new
TExecRequest();
14
result.setQuery_options(queryCtx.request.getQuery_options());
15
result.setAccess_events(analysisResult.getAccessEvents());
16
result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
18
if
(analysisResult.isCatalogOp()) {
19
result.stmt_type = TStmtType.DDL;
20
createCatalogOpRequest(analysisResult, result);
21
String jsonLineageGraph = analysisResult.getJsonLineageGraph();
22
if
(jsonLineageGraph !=
null
&& !jsonLineageGraph.isEmpty()) {
23
result.catalog_op_request.setLineage_graph(jsonLineageGraph);
26
if
(!analysisResult.isCreateTableAsSelectStmt())
return
result;
27
}
else
if
(analysisResult.isLoadDataStmt()) {
28
result.stmt_type = TStmtType.LOAD;
29
result.setResult_set_metadata(
new
TResultSetMetadata(Arrays.asList(
30
new
TColumn(
"summary"
, Type.STRING.toThrift()))));
31
result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
33
}
else
if
(analysisResult.isSetStmt()) {
34
result.stmt_type = TStmtType.SET;
35
result.setResult_set_metadata(
new
TResultSetMetadata(Arrays.asList(
36
new
TColumn(
"option"
, Type.STRING.toThrift()),
37
new
TColumn(
"value"
, Type.STRING.toThrift()))));
38
result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
如果是 DDL 命令(use, show tables, show databases, describe),那么调用 createCatalogOpRequest()。 如果是 Load Data 或者 Set 语句,就调用相应的 setmetadata 并转换为 Thrift。
执行计划生成
另外一种情况就是 Query 或者 DML 命令,那么就得创建和填充 TQueryExecRequest 了。该部分代码如下:
2
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
4
public
TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
12
Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
13
|| analysisResult.isCreateTableAsSelectStmt());
15
TQueryExecRequest queryExecRequest =
new
TQueryExecRequest();
17
LOG.debug(
"create plan"
);
18
Planner planner =
new
Planner(analysisResult, queryCtx);
21
ArrayList<PlanFragment> fragments = planner.createPlan();
23
List<ScanNode> scanNodes = Lists.newArrayList();
26
Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
28
for
(
int
fragmentId =
0
; fragmentId < fragments.size(); ++fragmentId) {
29
PlanFragment fragment = fragments.get(fragmentId);
30
Preconditions.checkNotNull(fragment.getPlanRoot());
31
fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.
class
), scanNodes);
32
fragmentIdx.put(fragment, fragmentId);
上面的 createPlan() 函数是 frontend 最重要的函数:根据 SQL 解析的结果和 client 传入的 query options, 生成执行计划。执行计划是用 PlanFragment 的数组表示的,最后会序列化到 TQueryExecRequest.fragments 然后传给 backend 的 coordinator 去调度执行。现在让我们来看看 createPlan() (Planner.java ) 的具体实现:
2
* Returns a list of plan fragments for executing an analyzed parse tree.
3
* May return a single-node or distributed executable plan.
5
public
ArrayList<PlanFragment> createPlan()
throws
ImpalaException {
6
SingleNodePlanner singleNodePlanner =
new
SingleNodePlanner(ctx_);
7
DistributedPlanner distributedPlanner =
new
DistributedPlanner(ctx_);
9
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
10
ctx_.getRootAnalyzer().getTimeline().markEvent(
"Single node plan created"
);
11
ArrayList<PlanFragment> fragments =
null
;
14
MaxRowsProcessedVisitor visitor =
new
MaxRowsProcessedVisitor();
15
singleNodePlan.accept(visitor);
16
long
maxRowsProcessed = visitor.get() == -
1
? Long.MAX_VALUE : visitor.get();
17
boolean
isSmallQuery =
18
maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
21
ctx_.getQueryOptions().setNum_nodes(
1
);
22
ctx_.getQueryOptions().setDisable_codegen(
true
);
23
if
(maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
24
maxRowsProcessed <
1024
&& ctx_.getQueryOptions().batch_size ==
0
) {
26
ctx_.getQueryOptions().setNum_scanner_threads(
1
);
30
if
(ctx_.isSingleNodeExec()) {
32
fragments = Lists.newArrayList(
new
PlanFragment(
33
ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
36
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
39
PlanFragment rootFragment = fragments.get(fragments.size() -
1
);
40
if
(ctx_.isInsertOrCtas()) {
41
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
42
if
(!ctx_.isSingleNodeExec()) {
44
rootFragment = distributedPlanner.createInsertFragment(
45
rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
48
rootFragment.setSink(insertStmt.createDataSink());
51
ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
52
List<Expr> resultExprs =
null
;
53
Table targetTable =
null
;
54
if
(ctx_.isInsertOrCtas()) {
55
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
56
resultExprs = insertStmt.getResultExprs();
57
targetTable = insertStmt.getTargetTable();
58
graph.addTargetColumnLabels(targetTable);
60
resultExprs = ctx_.getQueryStmt().getResultExprs();
61
graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
63
resultExprs = Expr.substituteList(resultExprs,
64
rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(),
true
);
65
rootFragment.setOutputExprs(resultExprs);
66
LOG.debug(
"desctbl: "
+ ctx_.getRootAnalyzer().getDescTbl().debugString());
67
LOG.debug(
"resultexprs: "
+ Expr.debugString(rootFragment.getOutputExprs()));
68
LOG.debug(
"finalize plan fragments"
);
69
for
(PlanFragment fragment: fragments) {
70
fragment.finalize(ctx_.getRootAnalyzer());
73
Collections.reverse(fragments);
74
ctx_.getRootAnalyzer().getTimeline().markEvent(
"Distributed plan created"
);
76
if
(RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
78
if
(ctx_.isInsertOrCtas()) {
79
Preconditions.checkNotNull(targetTable);
80
List<Expr> exprs = Lists.newArrayList();
81
if
(targetTable
instanceof
HBaseTable) {
82
exprs.addAll(resultExprs);
84
exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
85
exprs.addAll(resultExprs.subList(
0
,
86
targetTable.getNonClusteringColumns().size()));
88
graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
90
graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
92
LOG.trace(
"lineage: "
+ graph.debugString());
93
ctx_.getRootAnalyzer().getTimeline().markEvent(
"Lineage info computed"
);
createPlan 包括createSingleNodePlan 和 createPlanFragments 两个主要部分。其中第一个是单节点计划树,所有片段只能在一个节点 corrd 上执行,第二个是分布式执行计划树,片段可以分配到不同的节点中运行。我们先来看看 SingleNodePlanner.createSingleNodePlan() (SingleNodePlanner.java ) 该方法根据 Planner Context 中分析的语法树创建单节点执行计划树并返回根节点。计划递归处理语法树并执行以下操作,自上而下处理查询语句:
materialize the slots required for evaluating expressions of that statement
migrate conjuncts from parent blocks into inline views and union operands In the bottom-up phase generate the plan tree for every query statement:
perform join-order optimization when generating the plan of the FROM clause of a select statement; requires that all materialized slots are known for an accurate estimate of row sizes needed for cost-based join ordering
assign conjuncts that can be evaluated at that node and compute the stats of that node (cardinality, etc.)
apply combined expression substitution map of child plan nodes; if a plan node re-maps its input, set a substitution map to be applied by parents
具体代码如下:
2
* Generates and returns the root of the single-node plan for the analyzed parse tree
3
* in the planner context.
5
public
PlanNode createSingleNodePlan()
throws
ImpalaException {
6
QueryStmt queryStmt = ctx_.getQueryStmt();
9
Analyzer analyzer = queryStmt.getAnalyzer();
10
analyzer.computeEquivClasses();
11
analyzer.getTimeline().markEvent(
"Equivalence classes computed"
);
22
if
(queryStmt.getBaseTblResultExprs() !=
null
) {
23
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
26
LOG.trace(
"desctbl: "
+ analyzer.getDescTbl().debugString());
27
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
28
ctx_.getQueryOptions().isDisable_outermost_topn());
29
Preconditions.checkNotNull(singleNodePlan);
30
return
singleNodePlan;
上面的函数通过调用私有的 createQueryPlan() (SingleNodePlanner.java ) 函数实现。该函数为单节点执行创建计划树。为查询语句中的 Select/Project/Join/Union [All]/Group by/Having/Order by 生成 PlanNode。具体实现代码如下:
2
* Create plan tree for single-node execution. Generates PlanNodes for the
3
* Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt.
5
private
PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer,
boolean
disableTopN)
6
throws
ImpalaException {
8
if
(analyzer.hasEmptyResultSet())
return
createEmptyNode(stmt, analyzer);
11
if
(stmt
instanceof
SelectStmt) {
12
SelectStmt selectStmt = (SelectStmt) stmt;
14
root = createSelectPlan(selectStmt, analyzer);
17
if
(((SelectStmt) stmt).getAnalyticInfo() !=
null
) {
18
AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
19
ArrayList<TupleId> stmtTupleIds = Lists.newArrayList();
20
stmt.getMaterializedTupleIds(stmtTupleIds);
21
AnalyticPlanner analyticPlanner =
22
new
AnalyticPlanner(stmtTupleIds, analyticInfo, analyzer, ctx_);
23
List<Expr> inputPartitionExprs = Lists.newArrayList();
24
AggregateInfo aggInfo = selectStmt.getAggInfo();
25
root = analyticPlanner.createSingleNodePlan(root,
26
aggInfo !=
null
? aggInfo.getGroupingExprs() :
null
, inputPartitionExprs);
27
if
(aggInfo !=
null
&& !inputPartitionExprs.isEmpty()) {
29
aggInfo.setPartitionExprs(inputPartitionExprs);
33
Preconditions.checkState(stmt
instanceof
UnionStmt);
34
root = createUnionPlan((UnionStmt) stmt, analyzer);
38
boolean
sortHasMaterializedSlots =
false
;
39
if
(stmt.evaluateOrderBy()) {
40
for
(SlotDescriptor sortSlotDesc:
41
stmt.getSortInfo().getSortTupleDescriptor().getSlots()) {
42
if
(sortSlotDesc.isMaterialized()) {
43
sortHasMaterializedSlots =
true
;
49
if
(stmt.evaluateOrderBy() && sortHasMaterializedSlots) {
50
long
limit = stmt.getLimit();
53
boolean
useTopN = stmt.hasLimit() && !disableTopN;
55
root =
new
SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
56
useTopN, stmt.getOffset());
57
Preconditions.checkState(root.hasValidStats());
61
root.setLimit(stmt.getLimit());
62
root.computeStats(analyzer);
SingleNodePlanner.createSelectPlan() (SingleNodePlanner.java ) 函数创建实现 select 查询语句块中 Select/Project/Join/Group by/Having 等从句的 PlanNode 树。具体实现代码如下:
2
* Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
3
* of the selectStmt query block.
5
private
PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer)
6
throws
ImpalaException {
9
if
(selectStmt.getTableRefs().isEmpty()) {
10
return
createConstantSelectPlan(selectStmt, analyzer);
25
selectStmt.materializeRequiredSlots(analyzer);
27
ArrayList<TupleId> rowTuples = Lists.newArrayList();
29
for
(TableRef tblRef: selectStmt.getTableRefs()) {
30
rowTuples.addAll(tblRef.getMaterializedTupleIds());
37
if
(analyzer.hasEmptySpjResultSet()) {
38
PlanNode emptySetNode =
new
EmptySetNode(ctx_.getNextNodeId(), rowTuples);
39
emptySetNode.init(analyzer);
40
emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap());
41
return
createAggregationPlan(selectStmt, analyzer, emptySetNode);
46
List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
47
for
(TableRef ref: selectStmt.getTableRefs()) {
48
PlanNode plan = createTableRefNode(analyzer, ref);
49
Preconditions.checkState(plan !=
null
);
50
refPlans.add(
new
Pair(ref, plan));
53
for
(Pair<TableRef, PlanNode> entry: refPlans) {
54
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
59
if
(!selectStmt.getSelectList().isStraightJoin()) {
60
Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
61
root = createCheapestJoinPlan(analyzer, refPlans);
62
if
(root ==
null
) analyzer.setAssignedConjuncts(assignedConjuncts);
65
if
(selectStmt.getSelectList().isStraightJoin() || root ==
null
) {
68
root = createFromClauseJoinPlan(analyzer, refPlans);
69
Preconditions.checkNotNull(root);
73
if
(selectStmt.getAggInfo() !=
null
) {
74
root = createAggregationPlan(selectStmt, analyzer, root);
上面函数中调用的主要私有方法有: createTableRefNode()、createCheapestJoinPlan()、 createFromClauseJoinPlan()、 createAggregationPlan(),各个函数的具体实现如下:
createTableRefNode()
2
* Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
3
* CollectionTableRef or an InlineViewRef.
5
private
PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef)
6
throws
ImpalaException {
7
if
(tblRef
instanceof
BaseTableRef || tblRef
instanceof
CollectionTableRef) {
9
return
createScanNode(analyzer, tblRef);
10
}
else
if
(tblRef
instanceof
InlineViewRef) {
12
return
createInlineViewPlan(analyzer, (InlineViewRef) tblRef);
14
throw
new
InternalException(
15
"Unknown TableRef node: "
+ tblRef.getClass().getSimpleName());
createCheapestJoinPlan()
2
* 返回物化 join refPlans 中所有 TblRefs 开销最小的 plan
3
* 假设 refPlans 中的顺序和查询中的原始顺序相同
5
* - the plan is executable, ie, all non-cross joins have equi-join predicates
6
* - the leftmost scan is over the largest of the inputs for which we can still
7
* construct an executable plan(左边的是最大表)
8
* - all rhs‘s(right hand side?) are in decreasing order of selectiveness (percentage of rows they
10
* - outer/cross/semi joins: rhs serialized size is < lhs serialized size;(右边的表比左边的小)
11
* enforced via join inversion, if necessary(否则通过 join 反转实现)
12
* Returns null if we can‘t create an executable plan.
14
private
PlanNode createCheapestJoinPlan(
15
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
16
throws
ImpalaException {
17
LOG.trace(
"createCheapestJoinPlan"
);
18
if
(refPlans.size() ==
1
)
return
refPlans.get(
0
).second;
22
ArrayList<Pair<TableRef, Long>> candidates = Lists.newArrayList();
23
for
(Pair<TableRef, PlanNode> entry: refPlans) {
24
TableRef ref = entry.first;
25
JoinOperator joinOp = ref.getJoinOp();
35
if
(((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) &&
36
ref != refPlans.get(
1
).first) || joinOp.isNullAwareLeftAntiJoin()) {
41
PlanNode plan = entry.second;
42
if
(plan.getCardinality() == -
1
) {
45
candidates.add(
new
Pair(ref,
new
Long(
0
)));
46
LOG.trace(
"candidate "
+ ref.getUniqueAlias() +
": 0"
);
49
Preconditions.checkNotNull(ref.getDesc());
50
long
materializedSize =
51
(
long
) Math.ceil(plan.getAvgRowSize() * (
double
) plan.getCardinality());
52
candidates.add(
new
Pair(ref,
new
Long(materializedSize)));
53
LOG.trace(
"candidate "
+ ref.getUniqueAlias() +
": "
+ Long.toString(materializedSize));
55
if
(candidates.isEmpty())
return
null
;
59
Collections.sort(candidates,
60
new
Comparator<Pair<TableRef, Long>>() {
61
public
int
compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
62
long
diff = b.second - a.second;
63
return
(diff <
0
? -
1
: (diff >
0
?
1
:
0
));
68
for
(Pair<TableRef, Long> candidate: candidates) {
69
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
70
if
(result !=
null
)
return
result;
createFromClauseJoinPlan()
2
* 返回按照 from 语句顺序的 JoinPlan
4
private
PlanNode createFromClauseJoinPlan(
5
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
6
throws
ImpalaException {
8
Preconditions.checkState(!refPlans.isEmpty());
9
PlanNode root = refPlans.get(
0
).second;
10
for
(
int
i =
1
; i < refPlans.size(); ++i) {
11
TableRef innerRef = refPlans.get(i).first;
12
PlanNode innerPlan = refPlans.get(i).second;
13
root = createJoinNode(analyzer, root, innerPlan,
null
, innerRef);
14
root.setId(ctx_.getNextNodeId());
createAggregationPlan()
2
* Returns a new AggregationNode that materializes the aggregation of the given stmt.
3
* Assigns conjuncts from the Having clause to the returned node.
5
private
PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer,
6
PlanNode root)
throws
InternalException {
7
Preconditions.checkState(selectStmt.getAggInfo() !=
null
);
9
AggregateInfo aggInfo = selectStmt.getAggInfo();
10
root =
new
AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
12
Preconditions.checkState(root.hasValidStats());
15
if
(aggInfo.isDistinctAgg()) {
16
((AggregationNode)root).unsetNeedsFinalize();
18
((AggregationNode)root).setIntermediateTuple();
19
root =
new
AggregationNode(ctx_.getNextNodeId(), root,
20
aggInfo.getSecondPhaseDistinctAggInfo());
22
Preconditions.checkState(root.hasValidStats());
25
root.assignConjuncts(analyzer);
上面的 createCheapestJoinPlan() 和 createFromClauseJoinPlan() 方法调用了 createJoinNode() 和 createJoinPlan() 两个方法。它们的具体实现如下:
createJoinNode()
2
* 创建 join outer 和 inner 的 node。两者其中之一可能是一个根据 table ref 创建的 plan
3
* 但不能同时都是 plan。对应的 outer/inner tableRef 不能为空
5
private
PlanNode createJoinNode(
6
Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef,
7
TableRef innerRef)
throws
ImpalaException {
8
Preconditions.checkState(innerRef !=
null
^ outerRef !=
null
);
9
TableRef tblRef = (innerRef !=
null
) ? innerRef : outerRef;
11
List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
12
List<Expr> eqJoinPredicates = Lists.newArrayList();
15
if
(innerRef !=
null
) {
16
getHashLookupJoinConjuncts(
17
analyzer, outer.getTblRefIds(), innerRef, eqJoinConjuncts, eqJoinPredicates);
19
if
(!innerRef.getJoinOp().isOuterJoin()) {
20
analyzer.createEquivConjuncts(outer.getTblRefIds(), innerRef.getId(),
24
getHashLookupJoinConjuncts(
25
analyzer, inner.getTblRefIds(), outerRef, eqJoinConjuncts, eqJoinPredicates);
27
if
(!outerRef.getJoinOp().isOuterJoin()) {
28
analyzer.createEquivConjuncts(inner.getTblRefIds(), outerRef.getId(),
32
for
(BinaryPredicate eqJoinConjunct: eqJoinConjuncts) {
33
Expr swapTmp = eqJoinConjunct.getChild(
0
);
34
eqJoinConjunct.setChild(
0
, eqJoinConjunct.getChild(
1
));
35
eqJoinConjunct.setChild(
1
, swapTmp);
40
if
(eqJoinConjuncts.isEmpty()) {
46
if
(tblRef.getJoinOp().isOuterJoin() ||
47
tblRef.getJoinOp().isSemiJoin()) {
48
throw
new
NotImplementedException(
49
String.format(
"%s join with ‘%s‘ without equi-join "
+
50
"conjuncts is not supported."
,
51
tblRef.getJoinOp().isOuterJoin() ?
"Outer"
:
"Semi"
,
52
innerRef.getUniqueAlias()));
54
CrossJoinNode result =
55
new
CrossJoinNode(outer, inner, tblRef, Collections.<Expr>emptyList());
56
result.init(analyzer);
61
if
(tblRef.getJoinOp() == JoinOperator.CROSS_JOIN) {
62
tblRef.setJoinOp(JoinOperator.INNER_JOIN);
65
analyzer.markConjunctsAssigned(eqJoinPredicates);
67
List<Expr> otherJoinConjuncts = Lists.newArrayList();
68
if
(tblRef.getJoinOp().isOuterJoin()) {
71
otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(tblRef);
72
}
else
if
(tblRef.getJoinOp().isSemiJoin()) {
77
analyzer.getUnassignedConjuncts(tblRef.getAllTupleIds(),
false
);
78
if
(tblRef.getJoinOp().isNullAwareLeftAntiJoin()) {
79
boolean
hasNullMatchingEqOperator =
false
;
83
Iterator<BinaryPredicate> it = eqJoinConjuncts.iterator();
84
while
(it.hasNext()) {
85
BinaryPredicate conjunct = it.next();
86
if
(!conjunct.isNullMatchingEq()) {
87
otherJoinConjuncts.add(conjunct);
91
Preconditions.checkState(!hasNullMatchingEqOperator);
92
hasNullMatchingEqOperator =
true
;
95
Preconditions.checkState(hasNullMatchingEqOperator);
98
analyzer.markConjunctsAssigned(otherJoinConjuncts);
100
HashJoinNode result =
101
new
HashJoinNode(outer, inner, tblRef, eqJoinConjuncts, otherJoinConjuncts);
102
result.init(analyzer);
createJoinPlan()
2
* Returns a plan with leftmostRef‘s plan as its leftmost input; the joins
3
* are in decreasing order of selectiveness (percentage of rows they eliminate).
4
* The leftmostRef‘s join will be inverted if it is an outer/semi/cross join.
6
private
PlanNode createJoinPlan(
7
Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
8
throws
ImpalaException {
10
LOG.trace(
"createJoinPlan: "
+ leftmostRef.getUniqueAlias());
12
List<Pair<TableRef, PlanNode>> remainingRefs = Lists.newArrayList();
14
for
(Pair<TableRef, PlanNode> entry: refPlans) {
15
if
(entry.first == leftmostRef) {
18
remainingRefs.add(entry);
21
Preconditions.checkNotNull(root);
23
Set<TableRef> joinedRefs = Sets.newHashSet();
24
joinedRefs.add(leftmostRef);
27
boolean
planHasInvertedJoin =
false
;
28
if
(leftmostRef.getJoinOp().isOuterJoin()
29
|| leftmostRef.getJoinOp().isSemiJoin()
30
|| leftmostRef.getJoinOp().isCrossJoin()) {
35
leftmostRef.invertJoin(refPlans, analyzer);
36
planHasInvertedJoin =
true
;
41
while
(!remainingRefs.isEmpty()) {
43
PlanNode newRoot =
null
;
44
Pair<TableRef, PlanNode> minEntry =
null
;
45
for
(Pair<TableRef, PlanNode> entry: remainingRefs) {
46
TableRef ref = entry.first;
47
LOG.trace(Integer.toString(i) +
" considering ref "
+ ref.getUniqueAlias());
56
JoinOperator joinOp = ref.getJoinOp();
57
if
(joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
58
List<TupleId> currentTids = Lists.newArrayList(root.getTblRefIds());
59
currentTids.add(ref.getId());
64
List<TupleId> tableRefTupleIds = ref.getAllTupleIds();
65
if
(!currentTids.containsAll(tableRefTupleIds) ||
66
!tableRefTupleIds.containsAll(currentTids)) {
71
}
else
if
(ref.getJoinOp().isCrossJoin()) {
72
if
(!joinedRefs.contains(ref.getLeftTblRef()))
continue
;
75
PlanNode rhsPlan = entry.second;
76
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
78
boolean
invertJoin =
false
;
79
if
(joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
85
long
lhsCard = root.getCardinality();
86
long
rhsCard = rhsPlan.getCardinality();
87
if
(lhsCard != -
1
&& rhsCard != -
1
&&
88
lhsCard * root.getAvgRowSize() < rhsCard * rhsPlan.getAvgRowSize() &&
89
!joinOp.isNullAwareLeftAntiJoin()) {
93
PlanNode candidate =
null
;
95
ref.setJoinOp(ref.getJoinOp().invert());
96
candidate = createJoinNode(analyzer, rhsPlan, root, ref,
null
);
97
planHasInvertedJoin =
true
;
99
candidate = createJoinNode(analyzer, root, rhsPlan,
null
, ref);
101
if
(candidate ==
null
)
continue
;
102
LOG.trace(
"cardinality="
+ Long.toString(candidate.getCardinality()));
106
if
(joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
114
|| (candidate.getClass().equals(newRoot.getClass())
115
&& candidate.getCardinality() < newRoot.getCardinality())
116
|| (candidate
instanceof
HashJoinNode && newRoot
instanceof
CrossJoinNode)) {
121
if
(newRoot ==
null
) {
129
Preconditions.checkState(!planHasInvertedJoin);
135
long
lhsCardinality = root.getCardinality();
136
long
rhsCardinality = minEntry.second.getCardinality();
137
numOps += lhsCardinality + rhsCardinality;
138
LOG.debug(Integer.toString(i) +
" chose "
+ minEntry.first.getUniqueAlias()
139
+
" #lhs="
+ Long.toString(lhsCardinality)
140
+
" #rhs="
+ Long.toString(rhsCardinality)
141
+
" #ops="
+ Long.toString(numOps));
142
remainingRefs.remove(minEntry);
143
joinedRefs.add(minEntry.first);
147
root.setId(ctx_.getNextNodeId());
148
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
至此我们已经大概介绍了 createSingleNodePlan 的过程。 现在让我们回到 createPlan() 函数,来看看创建分布式执行计划树,即 createPlanFrangments 过程。
DistributedPlanner.createPlanFragments() (Planner.java ) 方法为单点计划树生成多个片段。具体代码如下:
3
* 片段通过 list 返回,list 中位置 i 的片段只能使用片段 j 的输出(j > i)。
5
* TODO: 考虑计划片段中的数据分片; 尤其是要比 createQueryPlan() 更加注重协调
6
* 聚集操作中 hash partitioning 以及分析计算中的 hash partitioning。
7
* (只有在相同 select 块中进行聚集和分析计算时才会发生协调)
9
public
ArrayList<PlanFragment> createPlanFragments(
10
PlanNode singleNodePlan)
throws
ImpalaException {
11
Preconditions.checkState(!ctx_.isSingleNodeExec());
12
AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
13
QueryStmt queryStmt = ctx_.getQueryStmt();
14
ArrayList<PlanFragment> fragments = Lists.newArrayList();
17
boolean
isPartitioned =
false
;
18
if
((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
19
&& !singleNodePlan.hasLimit()) {
20
Preconditions.checkState(!queryStmt.hasOffset());
23
LOG.debug(
"create plan fragments"
);
24
long
perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
25
LOG.debug(
"memlimit="
+ Long.toString(perNodeMemLimit));
27
createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
上面的方法调用私有成员方法 DistributedPlanner.createPlanFragments()DistributedPlanner.java 该方法返回生成 root 结果的 fragments。具体代码如下:
2
* 返回生成 ‘root‘ 结果的 fragments; 递归创建所有 input fragments 到返回的 fragment
3
* 如果创建了一个新的 fragment,会被追加到 ‘fragments’,这样 fragment 就会在所有需要
5
* 如果 ‘isPartitioned‘ 为否,,那么返回的 fragment 就是 unpartitioned;
6
* 否则就可能是 partitioned, 取决于它的输入是否 partitioned;
7
* the partition function is derived from the inputs.
9
private
PlanFragment createPlanFragments(
10
PlanNode root,
boolean
isPartitioned,
11
long
perNodeMemLimit, ArrayList<PlanFragment> fragments)
12
throws
InternalException, NotImplementedException {
13
ArrayList<PlanFragment> childFragments = Lists.newArrayList();
14
for
(PlanNode child: root.getChildren()) {
18
boolean
childIsPartitioned = !child.hasLimit();
22
child, childIsPartitioned, perNodeMemLimit, fragments));
25
PlanFragment result =
null
;
26
if
(root
instanceof
ScanNode) {
27
result = createScanFragment(root);
28
fragments.add(result);
29
}
else
if
(root
instanceof
HashJoinNode) {
30
Preconditions.checkState(childFragments.size() ==
2
);
31
result = createHashJoinFragment(
32
(HashJoinNode) root, childFragments.get(
1
), childFragments.get(
0
),
33
perNodeMemLimit, fragments);
34
}
else
if
(root
instanceof
CrossJoinNode) {
35
Preconditions.checkState(childFragments.size() ==
2
);
36
result = createCrossJoinFragment(
37
(CrossJoinNode) root, childFragments.get(
1
), childFragments.get(
0
),
38
perNodeMemLimit, fragments);
39
}
else
if
(root
instanceof
SelectNode) {
40
result = createSelectNodeFragment((SelectNode) root, childFragments);
41
}
else
if
(root
instanceof
UnionNode) {
42
result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
43
}
else
if
(root
instanceof
AggregationNode) {
44
result = createAggregationFragment(
45
(AggregationNode) root, childFragments.get(
0
), fragments);
46
}
else
if
(root
instanceof
SortNode) {
47
if
(((SortNode) root).isAnalyticSort()) {
49
result = createAnalyticFragment(
50
(SortNode) root, childFragments.get(
0
), fragments);
52
result = createOrderByFragment(
53
(SortNode) root, childFragments.get(
0
), fragments);
55
}
else
if
(root
instanceof
AnalyticEvalNode) {
56
result = createAnalyticFragment(root, childFragments.get(
0
), fragments);
57
}
else
if
(root
instanceof
EmptySetNode) {
58
result =
new
PlanFragment(
59
ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
61
throw
new
InternalException(
62
"Cannot create plan fragment for this node type: "
+ root.getExplainString());
65
fragments.remove(result);
66
fragments.add(result);
68
if
(!isPartitioned && result.isPartitioned()) {
69
result = createMergeFragment(result);
70
fragments.add(result);
上面的方法调用了大量的 create*Fragment() 私有成员方法。这些成员方法的具体实现可以查看源文件:DistributedPlanner.java
这些成员方法都返回了 PlanFragment 实例,关于该类的具体实现可以查看源代码:PlanFragment.java
至此,我们大概介绍了 createPlanFragments 的过程。
由于 createSingleNodePlan 和 createPlanFragments 两个 createPlan 最重要的部分都已经介绍了,createPlan 也就介绍到这里。现在让我们回到 frontend.createExecRequest() 继续来看剩下的内容。frontend.createExecRequest() 其余代码如下:
2
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
4
public
TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
5
throws
ImpalaException {
13
for
(
int
i =
1
; i < fragments.size(); ++i) {
14
PlanFragment dest = fragments.get(i).getDestFragment();
15
Integer idx = fragmentIdx.get(dest);
16
Preconditions.checkState(idx !=
null
);
17
queryExecRequest.addToDest_fragment_idx(idx.intValue());
22
LOG.debug(
"get scan range locations"
);
23
Set<TTableName> tablesMissingStats = Sets.newTreeSet();
24
for
(ScanNode scanNode: scanNodes) {
25
queryExecRequest.putToPer_node_scan_ranges(
26
scanNode.getId().asInt(),
27
scanNode.getScanRangeLocations());
28
if
(scanNode.isTableMissingStats()) {
29
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
33
queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
34
for
(TTableName tableName: tablesMissingStats) {
35
queryCtx.addToTables_missing_stats(tableName);
40
if
(queryCtx.request.query_options.isDisable_unsafe_spills()
41
&& !tablesMissingStats.isEmpty()
42
&& !analysisResult.getAnalyzer().hasPlanHints()) {
43
queryCtx.setDisable_spilling(
true
);
48
planner.computeResourceReqs(fragments,
true
, queryExecRequest);
49
}
catch
(Exception e) {
51
LOG.error(
"Failed to compute resource requirements for query\n"
+
52
queryCtx.request.getStmt(), e);
56
for
(PlanFragment fragment: fragments) {
57
TPlanFragment thriftFragment = fragment.toThrift();
58
queryExecRequest.addToFragments(thriftFragment);
62
TExplainLevel explainLevel = TExplainLevel.VERBOSE;
64
if
(analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
65
explainLevel = queryCtx.request.query_options.getExplain_level();
69
queryExecRequest.setQuery_ctx(queryCtx);
72
planner.getExplainString(fragments, queryExecRequest, explainLevel));
73
queryExecRequest.setQuery_plan(explainString.toString());
74
queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
76
String jsonLineageGraph = analysisResult.getJsonLineageGraph();
77
if
(jsonLineageGraph !=
null
&& !jsonLineageGraph.isEmpty()) {
78
queryExecRequest.setLineage_graph(jsonLineageGraph);
81
if
(analysisResult.isExplainStmt()) {
83
createExplainRequest(explainString.toString(), result);
87
result.setQuery_exec_request(queryExecRequest);
89
if
(analysisResult.isQueryStmt()) {
91
LOG.debug(
"create result set metadata"
);
92
result.stmt_type = TStmtType.QUERY;
93
result.query_exec_request.stmt_type = result.stmt_type;
94
TResultSetMetadata metadata =
new
TResultSetMetadata();
95
QueryStmt queryStmt = analysisResult.getQueryStmt();
96
int
colCnt = queryStmt.getColLabels().size();
97
for
(
int
i =
0
; i < colCnt; ++i) {
98
TColumn colDesc =
new
TColumn();
99
colDesc.columnName = queryStmt.getColLabels().get(i);
100
colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
101
metadata.addToColumns(colDesc);
103
result.setResult_set_metadata(metadata);
105
Preconditions.checkState(analysisResult.isInsertStmt() ||
106
analysisResult.isCreateTableAsSelectStmt());
111
analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
112
result.query_exec_request.stmt_type = TStmtType.DML;
115
InsertStmt insertStmt = analysisResult.getInsertStmt();
116
if
(insertStmt.getTargetTable()
instanceof
HdfsTable) {
117
TFinalizeParams finalizeParams =
new
TFinalizeParams();
118
finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
119
finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
120
finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt());
121
String db = insertStmt.getTargetTableName().getDb();
122
finalizeParams.setTable_db(db ==
null
? queryCtx.session.database : db);
123
HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
124
finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
125
finalizeParams.setStaging_dir(
126
hdfsTable.getHdfsBaseDir() +
"/_impala_insert_staging"
);
127
queryExecRequest.setFinalize_params(finalizeParams);
131
validateTableIds(analysisResult.getAnalyzer(), result);
133
timeline.markEvent(
"Planning finished"
);
134
result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift());
至此,FE 结束,返回 TExecRequest 型的对象给 backend 执行。
由于笔者刚开始接触 Impala,分析可能存在某些谬误,有任何疑问或建议都欢迎讨论。
以上是关于Impala 源码分析-FE的主要内容,如果未能解决你的问题,请参考以下文章
Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段
Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段
如何使用java代码通过JDBC连接Impala(附Github源码)
无法用maven编译Impala的前端
使用Java代码通过JDBC连接只启用Sentry的Impala异常分析
《Docker 源码分析》全球首发啦!