解析和校验Flink SQL语句
Posted 白夜鸦羽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解析和校验Flink SQL语句相关的知识,希望对你有一定的参考价值。
Flink对SQL的支持是基于Apache Calcite实现的,且Flink包中集成了Apache Calcite,所以我们可以直接调用Flink包中的SQL解析类,来解析和校验我们的Flink SQL语句。
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
/**
* 解析&校验 Flink SQL语句
*
* @param sql 一整段字符串sql
* @return sql语句list
*/
public static List<String> parseFlinkSql(String sql)
List<String> sqlList = new ArrayList<>();
if (sql != null && !sql.isEmpty())
try
SqlParser parser = SqlParser.create(sql, SqlParser.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
.setQuoting(BACK_TICK)
.setUnquotedCasing(Casing.TO_LOWER) //字段名统一转化为小写
.setQuotedCasing(Casing.UNCHANGED)
.setConformance(FlinkSqlConformance.DEFAULT)
.build()
);
List<SqlNode> sqlNodeList = parser.parseStmtList().getList();
if (sqlNodeList != null && !sqlNodeList.isEmpty())
for (SqlNode sqlNode : sqlNodeList)
sqlList.add(sqlNode.toString());
catch (Exception e)
e.printStackTrace();
return sqlList;
如果只进行Flink SQL的解析和校验,可以引入下面的包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parser</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-linq4j</artifactId>
<version>1.26.0</version>
</dependency>
如果是Flink项目,则flink-table-planner-blink这个包自带SQL解析功能,无需额外引入其他包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.12.0</version>
</dependency>
以上是关于解析和校验Flink SQL语句的主要内容,如果未能解决你的问题,请参考以下文章