使用Flink1.16.0的SQLGateway迁移Hive SQL任务

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Flink1.16.0的SQLGateway迁移Hive SQL任务相关的知识,希望对你有一定的参考价值。

使用Flink的SQL Gateway迁移Hive SQL任务

前言

我们有数万个离线任务,主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务,当然也有PySpark、打Jar包的Spark和打Jar包的Flink任务这种高成本的任务【Java和Scala都有】。毕竟SQL上手门槛极低,是个人都能写几下并且跑起来,还可以很容易看到run成功的数据长得像不像。其实HQL任务的性能并不会好到哪里去,主要是SQL Boy便宜,无脑堆人天就可以线性提升开发速度。DataPhin的底层基本可以确认就是beeline -f包了一层,而它本身作为二级队列,并不是真正意义上的网关。

我们之前做大数据基础平台时,也有为数据中台租户部署Kyuubi这个网关组件。

Apache Kyuubi:https://kyuubi.apache.org/

这货现在发育的灰常好:

已经不局限于一个霸占Yarn的资源锁定一个Session ID,然后提交Spark任务了。。。这货现在还可以支持Flink和Hudi。。。湖仓一体就需要这货。

燃鹅,新版Flink1.16.0新增了一个和Kyuubi、Spark、Tez抢饭碗的重磅功能:SQL Gateway:

众所周知,Flink的SQL和标准Hive SQL不太一样,新版Flink主动向Hive的dialect看齐:

从而提高了堆HQL的兼容性。官方号称可以97%的HQL任务无需修改直接迁移到Flink!!!还是比较唬人的。

常规的Spark SQL:https://lizhiyong.blog.csdn.net/article/details/120064874

只是让Spark去读Hive9083端口MetaStore的元数据,SQL解析AST、CBO优化和Task执行都是Spark的Catalyst负责。

Hive On Tez【或者MR、Spark】:https://lizhiyong.blog.csdn.net/article/details/123436630

这种方式只是Hive把解析完的任务提交给不同的计算引擎去具体运算。但是很少有听说过Hive On Flink【虽然翻Hive的源码好像可以去实现它】。

所以本文重点就是这个Hive On Flink。用流批一体的运算引擎去跑批也是个有趣的事情。有生之年有望看到Flink一统江湖了。。。

Hive On Flink原理

新增的支持

Hive任务能使用Flink来跑,Flink当然是做了很多支持:

Hive的MetaStore在大数据领域的地位相当于K8S在云原生容器编排领域的地位,或者Alluxio在云原生存算分离架构统一存储层的地位,都是事实上的标准了。能解析Hive的Metastore就可以管理Hadoop集群绝大多数的Hive表了。。。当然Hudi的一些表、Flink的一些SQL流式表也可能被管控到。

而支持Hive的UDF,天然就拥有了Hive的那几百个系统函数:https://lizhiyong.blog.csdn.net/article/details/127501392

当然就可以减少很多写UDF的平台组件二开攻城狮或者部分资深SQL Boy的工作量。UDF函数们是公司的资产,轻易不可以弃用的。

作为一个运算引擎,在Source端和Sink端都支持流式和批式操作Hive表,毫不意外。还可以自动小文件合并,有点像Hudi的Merge On Read这种写多读少的模式了。

SQL解析

在SQL Boy们眼里最重要的SQL,其实在Java和C#种也就是个普通的String字符串,走JDBC传参或者ADO.NET,如果是开发个AD Hoc即席查询平台,单从功能角度,其实都不需要关心租户们传的select语句的具体内容。但是执行引擎必须能把SQL字符串给解析成具体的执行计划或者底层任务。

Flink1.16.0使用了这么一个可插拔的插件,将HQL解析为Logical Plan逻辑计划。后续的ROB、CBO优化生成Physical Plan物理计划,还有转换为Flink最终的Job Graph都是与普通的Blink执行套路一致。

效果

可以满足大部分应用场景了。

命令行和API、运行时、底层资源调度,都可以实现一致,运维起来应该要方便不少。

Gateway

Flink自带了Flink SQL Gateway,显而易见的好处是平台和组件二开人员不需要去自己写Gateway去Dispatch分发任务了,甚至二级调度都可以省了。。。

本身后端就可以多租户了。。。还可以支持多种Cluster,K8S和Yarn或者Docker的Standalone混合云考虑一下???

前端支持Rest和Hive Server2,对Java开发人员和SQL Boy们都很友好。

HS2Endpoint

有点区别:

优势

尤其是处理异构数据源:

优势很明显。做联邦查询的改动也只是需要+个Catalog。

Demo

FFA2022的罗宇侠&方盛凯两位大佬带来个Demo,展示了Flink如何使用Hive和Flink的dialect分别按流式和批式跑任务。

为了方便查看,笔者手动敲出来了:

流式

建表:

--创建目标表
create table if not exists dwd_category_by_day(
	`i_category` string,
	`cate_sales` double,
	`cayehory_day_order_cnt` bigint
) 
partitioned by (
	`year` bigint,
	`day` bigint
)
TBLPROPERTIES(
	'sink.partition-commit.policy.kind'='metastore,success-file'
)
;

--创建源表
set table.sql-dialect=default;

create table if not exists s_dwd_store_sales(
	`ss_item_sk` bigint,
	`i_brand` string,
	`i_class` string,
	`i_category` string,
	`ss_sales_price` double,
	`d_date` date,
	`d_timestamp` as cast(d_date as timestamp(3)),
	watermark for `d_timestamp` as `d_timestamp`
) with (
	'connector'='kafka',
	'topic'='dwd_store_sales',
	'properties.bootstrap.servers'='192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092',
	'properties.group.id'='FFA',
	'key.fields'='ss_item_sk',
	'scan.startup_mode'='earlist-offset',
	'key.format'='json',
)
;

根据Demo的建表DDL,可以看出按照Hive语法建表时,Flink需要设置表的属性。

而使用传统Flink的语法建流式表时,反倒需要手动指定dialect。说明默认的dialect其实是:

set table.sql-dialect=hive;

每日类销量以及订单数统计:

set table.sql-dialect=default;
set execution.runtime-mode=streaming;
set table.cml-sync=false;--异步提交作业

--开启检查点
set execution.checkpointing.interval=30s;

insert into dwd_category_by_day
select
	i_category,
	sum(ss_sales_price) as month_sales,
	count(1) as order_cnt,
	year(window_start) as `year`,
	dayofyear(window_start) as `day`
from TABLE(
	TUMBLE(
		TABLE s_dwd_store_sales,DESCRIPTOR(d_timestamp),INTERVAL '1' DAY
	)
)
group by
	window_start,
	window_end,
	i_category
;

流式的SQL需要设置滑动的时间窗口,貌似没啥子毛病。

销量最佳Top3:

set table.sql_dialect=default;

select
	i_category,
	categoru_day_order_cnt,
	rownum
from(
	select
		i_category,
		categoru_day_order_cnt,
		row_number() over (order by categoru_day_order_cnt desc) as rownum
	from
		dwd_category_by_day
)
where
	rownum<=3
;

Flink的SQL不用像Hive的SQL那样每个子查询都要起别名【Spark SQL也不用】,太棒了!!!

可以看到流式的SQL任务,开发成本肯定比Java和Scala写DataStreaming算子低!!!利好SQL Boy。

批式

desc tpcds_bin_orc_2.dwd_store_sales;

这个表2位大佬已经灌过数据,根据表结构,笔者大概知道大概也是长这样:

create table if not exists tpcds_bin_orc_2.dwd_store_sales(
	`ss_item_sk` bigint,
	`i_brand` string,
	`i_class` string,
	`i_category` string,
	`ss_sales_price` double
)
partitioned by (
	`d_date` date
)
;

每日大类销量以及订单数统计:

insert overwrite dwd_category_by_day
select
	i_category,
	sum(ss_sales_price) as month_sales,
	count(1) as order_cnt,
	year(d_date) as `year`,
	datediff(d_date,concat(year(d_date)-1,'-12-31'))
from
	tpcds_bin_orc_2.dwd_store_sales
group by
	year(d_date),
	datediff(d_date,concat(year(d_date)-1,'-12-31')),
	i_category
;

销量最佳Top3:

select
	i_category,
	categoru_day_order_cnt,
	rownum
from(
	select
		i_category,
		categoru_day_order_cnt,
		row_number() over (order by categoru_day_order_cnt desc) as rownum
	from
		dwd_category_by_day
)
where
	rownum<=3
;

可以看到批式的SQL任务由于数据不会在运算时发生变化,不用考虑各种事件时间和水位线还有滑动时间窗口,直接替换即可,更简单!!!

宣传的97%HQL任务可以不加改动,直接迁移到Flink,还算有希望的。不过底层做了什么惊天地泣鬼神的大事,对于只会写业务脚本的SQL Boy们来说,也无关痛痒。

Github参考资料

Flink sql Gateway有个Github地址:https://github.com/ververica/flink-sql-gateway

作者Ververica:https://www.ververica.com/

它就是Flink的公司。

Github的这个Flink sql Gateway貌似很久没有更新了。。。但是它毕竟只是与BE交互的FE,还是可以参考。

启动Gateway

./bin/sql-gateway.sh -h

The following options are available:
     -d,--defaults <default configuration file>   The properties with which every new session is initialized. 
                                                  Properties might be overwritten by session properties.
     -h,--help                                    Show the help message with descriptions of all options.
     -j,--jar <JAR file>                          A JAR file to be imported into the session. 
                                                  The file might contain user-defined classes needed for 
                                                  statements such as functions, the execution of table sources,
                                                  or sinks. Can be used multiple times.
     -l,--library <JAR directory>                 A JAR file directory with which every new session is initialized. 
                                                  The files might contain user-defined classes needed for 
                                                  the execution of statements such as functions,
                                                  table sources, or sinks. Can be used multiple times.
     -p,--port <service port>                     The port to which the REST client connects to.

下Flink集群有这个角标。

典型的yaml

默认的配置文件:

# Define server properties.

server:
  bind-address: 127.0.0.1           # optional: The address that the gateway binds itself (127.0.0.1 by default)
  address: 127.0.0.1                # optional: The address that should be used by clients to connect to the gateway (127.0.0.1 by default)
  port: 8083                        # optional: The port that the client connects to  (8083 by default)
  jvm_args: "-Xmx2018m -Xms1024m"   # optional: The JVM args for SQL gateway process


# Define session properties.

session:
  idle-timeout: 1d                  # optional: Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero. The minimum unit is in milliseconds. (1d by default)
  check-interval: 1h                # optional: The check interval for session idle timeout, which can be disabled by setting to zero. The minimum unit is in milliseconds. (1h by default)
  max-count: 1000000                # optional: Max count of active sessions, which can be disabled by setting to zero. (1000000 by default)


# Define tables here such as sources, sinks, views, or temporal tables.

tables:
  - name: MyTableSource
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/path/to/something.csv"
    format:
      type: csv
      fields:
        - name: MyField1
          type: INT
        - name: MyField2
          type: VARCHAR
      line-delimiter: "\\n"
      comment-prefix: "#"
    schema:
      - name: MyField1
        type: INT
      - name: MyField2
        type: VARCHAR
  - name: MyCustomView
    type: view
    query: "SELECT MyField2 FROM MyTableSource"

# Define user-defined functions here.

functions:
  - name: myUDF
    from: class
    class: foo.bar.AggregateUDF
    constructor:
      - 7.6
      - false

# Define available catalogs

catalogs:
   - name: catalog_1
     type: hive
     property-version: 1
     hive-conf-dir: ...
   - name: catalog_2
     type: hive
     property-version: 1
     default-database: mydb2
     hive-conf-dir: ...
     hive-version: 1.2.1


# Properties that change the fundamental execution behavior of a table program.

execution:
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  current-catalog: catalog_1        # optional: name of the current catalog of the session ('default_catalog' by default)
  current-database: mydb1           # optional: name of the current database of the current catalog
                                    #   (default database of the current catalog by default)


# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000

支持的语法

statementcomment
SHOW CATALOGSList all registered catalogs
SHOW DATABASESList all databases in the current catalog
SHOW TABLESList all tables and views in the current database of the current catalog
SHOW VIEWSList all views in the current database of the current catalog
SHOW FUNCTIONSList all functions
SHOW MODULESList all modules
USE CATALOG catalog_nameSet a catalog with given name as the current catalog
USE database_nameSet a database with given name as the current database of the current catalog
CREATE TABLE table_name …Create a table with a DDL statement
DROP TABLE table_nameDrop a table with given name
ALTER TABLE table_nameAlter a table with given name
CREATE DATABASE database_name …Create a database in current catalog with given name
DROP DATABASE database_name …Drop a database with given name
ALTER DATABASE database_name …Alter a database with given name
CREATE VIEW view_name AS …Add a view in current session with SELECT statement
DROP VIEW view_name …Drop a table with given name
SET xx=yySet given key’s session property to the specific value
SETList all session’s properties
RESET ALLReset all session’s properties set by SET command
DESCRIBE table_nameShow the schema of a table
EXPLAIN PLAN FOR …Show string-based explanation about AST and execution plan of the given statement
SELECT …Submit a Flink SELECT SQL job
INSERT INTO …Submit a Flink INSERT INTO SQL job
INSERT OVERWRITE …Submit a Flink INSERT OVERWRITE SQL job

功能还算齐全。

Beeline

beeline> !connect jdbc:flink://localhost:8083?planner=blink

Beeline version 2.2.0 by Apache Hive
beeline> !connect jdbc:flink://localhost:8083?planner=blink
Connecting to jdbc:flink://localhost:8083?planner=blink
Enter username for jdbc:flink://localhost:8083?planner=blink: 
Enter password for jdbc:flink://localhost:8083?planner=blink: 
Connected to: Apache Flink (version 1.10.0)
Driver: Flink Driver (version 0.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:flink://localhost:8083> CREATE TABLE T(
. . . . . . . . . . . . . . . >   a INT,
. . . . . . . . . . . . . . . >   b VARCHAR(10)
. . . . . . . . . . . . . . . > ) WITH (
. . . . . . . . . . . . . . . >   'connector.type' = 'filesystem',
. . . . . . . . . . . . . . . >   'connector.path' = 'file:///tmp/T.csv',
. . . . . . . . . . . . . . . >   'format.type' = 'csv',
. . . . . . . . . . . . . . . >   'format.derive-schema' = 'true'
. . . . . . . . . . . . . . . > );
No rows affected (0.158 seconds)
0: jdbc:flink://localhost:8083> INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello');
No rows affected (4.747 seconds)
0: jdbc:flink://localhost:8083> SELECT * FROM T;
+----+--------+--+
| a  |   b    |
+----+--------+--+
| 1  | Hi     |
| 2  | Hello  |
+----+--------+--+
2 rows selected (0.994 seconds)
0: jdbc:flink://localhost:8083> 

这是比较老的语法了,传统的Flink SQL。

JDBC

当然可以使用Java走JDBC调用:

Jar包:https://github.com/ververica/flink-jdbc-driver/releases

Demo:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class Sample 
	public static void main(String[] args) throws Exception 
		Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
		Statement statement = connection.createStatement();

		statement.executeUpdate("CREATE TABLE T(\\n" +
			"  a INT,\\n" +
			"  b VARCHAR(10)\\n" +
			") WITH (\\n" +
			"  'connector.type' = 'filesystem',\\n" +
			"  'connector.path' = 'file:///tmp/T.csv',\\n" +
			"  'format.type' = 'csv',\\n" +
			"  'format.derive-schema' = 'true'\\n" +
			")");
		statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')");
		ResultSet rs = statement.executeQuery("SELECT * FROM T");
		while (rs.next()) 
			System.out.println(rs.getInt(1) + ", " + rs.getString(2));
		

		statement.close();
		connection.close();
	

传统的Flink SQL就是这么写。。。相当古老了。。。

Shell脚本

启动sql gateway的shell较新版本:

function usage() 
  echo "Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]"
  echo "  commands:"
  echo "    start               - Run a SQL Gateway as a daemon"
  echo "    start-foreground    - Run a SQL Gateway as a console application"
  echo "    stop                - Stop the SQL Gateway daemon"
  echo "    stop-all            - Stop all the SQL Gateway daemons"
  echo "    -h | --help         - Show this help message"


################################################################################
# Adopted from "flink" bash script
################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \\(.*\\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

################################################################################
# SQL gateway specific logic
################################################################################

ENTRYPOINT=sql-gateway

if [[ "$1" = *--help ]] || [[ "$1" = *-h ]]; then
  usage
  exit 0
fi

STARTSTOP=$1

if [ -z "$STARTSTOP<

以上是关于使用Flink1.16.0的SQLGateway迁移Hive SQL任务的主要内容,如果未能解决你的问题,请参考以下文章

使用Flink1.16.0的SQLGateway迁移Hive SQL任务

Flink SQL Gateway REST Endpoint 使用教程

Flink SQL Gateway REST Endpoint 使用教程

EBS开发附件上传和下载功能(转)

Oracle数据库部分迁至闪存存储方案

svn迁git保留commit记录