Flink SQL Gateway REST Endpoint 使用第二弹

Posted JasonLee实时计算

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL Gateway REST Endpoint 使用第二弹相关的知识,希望对你有一定的参考价值。

在上一篇文章中,我们只是使用 Rest API 通过 Flink SQL Gateway 向 standalone 集群提交了 SQL 任务,但是还存在一些问题没有解决,比如如何设置任务的 job name, checkpoint 等配置,以及如何从 checkpoint 恢复任务等,今天这篇文章就来解决这些问题。

SQL Gateway Configuration

在启动 SQL Gateway 的时候我们可以通过 -Dkey = value 给 SQL Gateway 设置一些参数。

./sql-gateway -Dkey=value

./sql-gateway.sh start-foreground \\
-Dsql-gateway.endpoint.rest.address=localhost \\
-Dexecution.checkpointing.interval=30s \\
-Dpipeline.operator-chaining=false \\
-Dpipeline.name=flinksqltest

比如我们可以在这里设置 checkpoint,job name 等配置,但是一个 SQL Gateway 里面是可以运行多个不同 Session 的,一个 Session 里面又可以执行多个 SQL 任务。所以如果我们在 SQL Gateway 级别设置的话,那么所有的 SQL 任务都会走相同的配置,这显然是不符合需求的,我们需要给每个任务单独的设置 checkpoint,job name 等配置,那么应该怎么做呢?

其实可以通过 SET/RESET 来设置,目前 SQL Gateway 支持 DDL/DML/QUERY/SET/RESET/Utils statement 操作,所以我们只需要在提交任务之前通过 SET 来设置相关的配置即可。

SET 命令

"set pipeline.name = jasonlee;"
"set execution.checkpointing.interval = 10s;"
"set state.backend = rocksdb;"
"set state.checkpoints.dir = file:///Users/jasonlee/flink/checkpoint;"
"set execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;"
"set taskmanager.numberOfTaskSlots = 2;"
"SET parallelism.default = 4;"

提交任务

image-20230107211409434

像上面这样给任务设置配置即可,当然了你可以设置 Flink 支持的任何配置,这样我们就可以给单独的 SQL 任务配置参数了。完整提交任务的代码可以参考上一篇文章,这里就不贴了。

Flink UI

image-20230107211503944

可以看到 job name 设置生效了,checkpoint 也正常完成了,然后我们把任务 cancel 掉,指定一个 checkpoint 恢复任务。

只需要通过 SET 命令设置 execution.savepoint.path 路径即可,在启动任务之前,添加下面的配置。

"SET execution.savepoint.path = /Users/jasonlee/flink/checkpoint/c88fc59af8c1d4d838a5dbf4206e2ef0/chk-60;",

从 checkpoint 恢复任务

Flink UI

image-20230221201008225

任务正常拉起,然后再来看下是否是从我们指定的 checkpoint 恢复的。

Flink UI Checkpoint 页面

image-20230107211708484

可以看到任务是从我们指定的 chk-24 恢复的,说明从 checkpoint 恢复任务是可以的,对于 savepoint 来说也是同样的道理。这里就不在测试了。

总结

在 Flink 1.16.0 版本中支持了 SQL Gateway 的方式提交 Flink SQL 任务,这极大的简化了提交 SQL 任务的过程,降低了管理 SQL 任务的门槛,用户可以方便的通过 Rest API  的方式提交和管理 Flink SQL 任务。

推荐阅读

Flink 任务实时监控最佳实践

Flink on yarn 实时日志收集最佳实践

Flink 1.14.0 全新的 Kafka Connector

Flink 1.14.0 消费 kafka 数据自定义反序列化类

Flink SQL JSON Format 源码解析

Flink on yarn 远程调试源码

Flink 通过 State Processor API 实现状态的读取和写入

Flink 侧流输出源码解析

Flink 源码:广播流状态源码解析

Flink 源码分析之 Client 端启动流程分析

Flink Print SQL Connector 添加随机取样功能

Flink SQL Gateway REST Endpoint 使用教程一

如果你觉得文章对你有帮助,麻烦点一下在看吧,你的支持是我创作的最大动力。

以上是关于Flink SQL Gateway REST Endpoint 使用第二弹的主要内容,如果未能解决你的问题,请参考以下文章

Flink SQL Gateway REST Endpoint 使用第二弹

Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析

apigee gateway、authn 和 authz:REST API 调用另一个 REST API

Flink 监控指南 被动拉取 Rest API

如何将接收到的(承载)访问令牌传递给生成的 REST 客户端,以调用安全的 API-Gateway 端点

使用 Rest API Gateway lambda 集成 CORS 问题进行放大