FLinkSQL+FlinkCDC
Posted 亾甦靈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLinkSQL+FlinkCDC相关的知识,希望对你有一定的参考价值。
Flink
流程:
FlinkCDC读取mysqlBinLog,增删改同步写入Mysql表。
集群模式为Flink的Standalone模式,不安装Hadoop
部署阶段
基础配置
- 每台机器防火墙关闭
- 配置静态ip地址
- 两台机器能互相免密登录
- 配置hosts实现主机名映射ip地址
- 实现Xshell连接两台机器
- 配置xsync分发文件脚本
- Mysql需要开启BinLog
核心模块版本选择
安装解压步骤省略,自行解决
Jdk11:因为Flink1.15.3在官网要求最低需要java11
Flink-1.15.3:稳定版
zookeeper-3.7.1:稳定版
不要忘记配置环境变量
配置nfs
配置文件中描述:
nfs用于flink集群高可用模式中checkpoint和savepoint存储metadata的路径,还有zookeeper的相关信息存储
Must be a durable file system that is accessible from all nodes(like HDFS, S3, Ceph, nfs, …)
high-availability.storageDir: file:///nfs/HA
state.checkpoints.dir: file:///nfs/checkPoint
state.savepoints.dir: file:///nfs/savePoint
注意点:
- /etc/export路径不起作用,则使用/etc/exports
- 每次重启可能需要重新挂载mount -t nfs 192.168..:/nfs /nfs
配置Flink
lib中的jar包
有些可能在部署过程中不起作用,但是没有移除出去
commons-cli-1.5.0.jar flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar hadoop-hdfs-3.2.1.jar
flink-cep-1.15.3.jar flink-shaded-zookeeper-3.5.9.jar log4j-1.2-api-2.17.1.jar
flink-connector-files-1.15.3.jar flink-sql-connector-mysql-cdc-2.3.0.jar log4j-api-2.17.1.jar
flink-connector-jdbc-1.15.2.jar flink-table-api-java-uber-1.15.3.jar log4j-core-2.17.1.jar
flink-csv-1.15.3.jar flink-table-planner-loader-1.15.3.jar log4j-slf4j-impl-2.17.1.jar
flink-dist-1.15.3.jar flink-table-runtime-1.15.3.jar mysql-connector-java-5.1.49.jar
flink-json-1.15.3.jar hadoop-client-3.2.1.jar
flink-scala_2.12-1.15.3.jar hadoop-common-3.2.1.jar
为什么需要上面列出的jar包?
因为以下问题:
- FlinkSql无法访问CDC/JDBC
导入以下jar包到$flink_home/lib下
flink-sql-connector-mysql-cdc-2.3.0.jar
flink-connector-jdbc-1.15.2.jar
- flinksql如下报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context
导入以下jar包到$flink_home/lib下
hadoop-client-3.2.1.jar(重要)
hadoop-hdfs-3.2.1.jar(这个貌似没有用,没测试过)
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar(重要)
hadoop-common-3.2.1.jar(重要)
- Exception in thread “main” java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;
集群启动失败
需要导入commons-cli-1.5.0.jar(https://mvnrepository.com/artifact/commons-cli/commons-cli/1.5.0)
flink-conf.yaml
#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: localhost
jobmanager.rpc.port: ****
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 1
#==============================================================================
# High Availability
#==============================================================================
high-availability: zookeeper
high-availability.storageDir: file:///nfs/HA
high-availability.zookeeper.quorum: node1:2181,node2:2181
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
state.checkpoints.dir: file:///nfs/checkPoint
state.savepoints.dir: file:///nfs/savePoint
jobmanager.execution.failover-strategy: region
#==============================================================================
# Rest & web frontend
#==============================================================================
rest.address: localhost
rest.bind-address: 0.0.0.0
#==============================================================================
# Advanced
#==============================================================================
classloader.resolve-order: parent-first
问题:
1. 为何有些设置项要改为 0.0.0.0 ?
配置文件中描述:To enable this, set the bind-host address to one that has access to an outside facing network interface, such as 0.0.0.0.
许多设置项默认是localhost,Standalone模式下设置为不启用,如果需要启用则改为 0.0.0.0
以下是设置项具体关联内容(在flink Standalone模式下必须设置):
- rest-bind-address: 0.0.0.0 启用WebUI界面,启动集群后可以在主节点的UI端口访问UI界面
- taskmanager.bind-host: 0.0.0.0 启用此功能,将绑定主机地址设置为可以访问面向外部的网络的地址
- jobmanager.bind-host: 0.0.0.0 启用此功能,webUI界面才可显示可用的资源都正常,否则可用资源都显示为0
2. 出现这种错误案例(详细信息请点击此处)怎么办?
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler
能启动flinksql客户端,但是flinksql insert等操作无法执行问题
配置文件中配置 classloader.resolve-order: parent-first即可解决
workers
192.168.***.***
192.168.***.***
如果配置了hosts可以写成以下形式
node1
node2
(node3)...
masters
具备高可用需要多台master,同时只有一个JobManager工作,如果JobManager所在机器宕机,集群会启动备用节点,测试时大概需要30秒左右,高可用相关的路径必须是所有集群都能访问到的路径,否则配置无效
192.168.***.***:端口号
192.168.***.***:端口号
配置Zookeeper
为了方便,需要配置一个启动脚本方便使用
#!/bin/bash
case $1 in
"start")
for i in node1 node2
do
echo ----------zookeeper $i 启动----------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
done
;;
"stop")
for i in node1 node2
do
echo ----------zookeeper $i 停止----------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
done
;;
"status")
for i in node1 node2
do
echo ----------zookeeper $i 状态----------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
done
;;
esac
注意点:
- 需要把zookeeper的zoo.cfg文件复制到$flink_home/conf下
运行阶段
流程采用FlinkSQL执行任务
sql-client.sh
FlinkSql表主键
有效性检查
SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。
Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。
Notes: 在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。
执行sql脚本,使用下面这个命令
sql-client.sh embedded -f [filename]
CREATE TABLE testpk_target(
id int,
ts timestamp(3),
PRIMARY KEY (id) NOT ENFORCED
)WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/数据库名',
'table-name' = '表名',
'username' = '******',
'password' = '******'
);
CREATE TABLE testpk(
id int,
PRIMARY KEY (id) NOT ENFORCED
)WITH(
'connector' = 'mysql-cdc',
'hostname' = '192.168.***.***',
'port' = '****',
'username' = '******',
'password' = '******',
'database-name' = '数据库名',
'table-name' = '表名',
'debezium.snapshot.mode' = 'initial'
);
insert into testpk_target
select id,now() as ts from testpk;
状态与容错 savepoint/checkpoint 的使用
问题:
- 如何从checkpoint/savepoint恢复 ?
首先需要在sql-client界面启用checkpoint,或者配置文件中设置checkpoint生成时间间隔
SET execution.checkpointing.interval = 60s;
在insert等操作执行前设置如下设置
SET execution.savepoint.path = '/opt/module/flink/checkPoint/c318946573f2a1f64824782aeb8af4b4/chk-1';
一旦设置检查点路径,后续sql语句都会启用
需要重置检查点路径,使得后续的sql语句不在从检查点恢复
RESET execution.savepoint.path;
- 如何使用savepoint ?
- flink cancel -s与flink stop -p区别
flink cancel -s:取消任务并生成savepoint,保留checkpoint
flink stop -p:停止任务并生成savepoint,不保留checkpoint
网页端显示任务结束状态不同:
cancel显示为cancelled
stop显示finish
Watermark在FlinkSql中的使用
DataStream Api是选中一个实体类的某个属性作为Watermark的标记
FlinkSql也类似,并且提供以下三种 watermark 策略:
- 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column
严格递增时间戳:发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到,假设10:00来了2条数据,时间戳都为10:00,第二条数据认为迟到,所以不太合理,下面这个递增时间戳稍微合理一点
- 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND
递增时间戳:发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。
- 有界乱序时间戳(符合生产环境数据的乱序特点): WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit
有界乱序时间戳: 发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是一个 5 秒延迟的 watermark 策略。
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
Watermark 根据配置文件中 pipeline.auto-watermark-interval 中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。
窗口函数
Apache Flink 提供了几个窗口表值函数 (TVF) 来将表的元素划分为窗口
当前版本为Flink1.15.3, 官网窗口函数参考
滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。
TUMBLE函数采用三个必需参数,一个可选参数:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
- data:表参数,可以是与时间属性列的任何关系。
- timecol:是一个列描述符,将数据的时间属性列映射到翻转窗口。
- size:窗口宽度。
- offset:可选参数,指定窗口开始偏移量。
滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- data:表参数,可以是与时间属性列的任何关系。
- timecol:是一个列描述符,将数据的时间属性列映射到跳跃窗口。
- slide:是指定顺序跳跃窗口开始之间的持续时间的持续时间
- size:是指定跳跃窗口宽度的持续时间。
- offset:是一个可选参数,用于指定窗口开始偏移的偏移量。
累积窗口在某些情况下非常有用,例如在固定的窗口间隔内提前触发翻转窗口。例如,每日仪表板从 00:00 到每分钟绘制累积 UV,10:00 的 UV 表示从 00:00 到 10:00 的 UV 总数。通过累积窗口轻松实现。
例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将获得每天的窗口:[00:00, 01:00),[00:00, 02:00),[00:00, 03:00),[00:00, 24:00)
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- data:表参数,可以是与时间属性列的任何关系。
- timecol:列描述符,将数据的时间属性列映射到累积窗口。
- step:指定顺序累积窗口结束之间增加的窗口大小的持续时间。
- size:指定累积窗口宽度,必须整数倍step
- offset:可选参数,指定窗口开始偏移量。
- Session Windows 会话窗口(即将支持)
会话窗口(SESSION)通过SESSION活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,该窗口就会关闭。
以上是关于FLinkSQL+FlinkCDC的主要内容,如果未能解决你的问题,请参考以下文章
95-910-142-源码-FlinkSQL-FlinkSQL追加模式与缩进模式区别
flinksql的 / 的结果只会保留整数部分,flinksql 不支持 div运算符。hive mysql : / 结果是小数, div 结果只会保留整数部分