FLinkSQL+FlinkCDC

Posted 亾甦靈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLinkSQL+FlinkCDC相关的知识,希望对你有一定的参考价值。

Flink


流程:
FlinkCDC读取mysqlBinLog,增删改同步写入Mysql表。
集群模式为Flink的Standalone模式,不安装Hadoop

部署阶段

基础配置

  1. 每台机器防火墙关闭
  2. 配置静态ip地址
  3. 两台机器能互相免密登录
  4. 配置hosts实现主机名映射ip地址
  5. 实现Xshell连接两台机器
  6. 配置xsync分发文件脚本
  7. Mysql需要开启BinLog

核心模块版本选择

安装解压步骤省略,自行解决
Jdk11:因为Flink1.15.3在官网要求最低需要java11
Flink-1.15.3:稳定版
zookeeper-3.7.1:稳定版

不要忘记配置环境变量

配置nfs

详细介绍以及如何配置请点击此处

配置文件中描述:
  nfs用于flink集群高可用模式中checkpoint和savepoint存储metadata的路径,还有zookeeper的相关信息存储

Must be a durable file system that is accessible from all nodes(like HDFS, S3, Ceph, nfs, …)

high-availability.storageDir: file:///nfs/HA
state.checkpoints.dir: file:///nfs/checkPoint
state.savepoints.dir: file:///nfs/savePoint

注意点:

  1. /etc/export路径不起作用,则使用/etc/exports
  2. 每次重启可能需要重新挂载mount -t nfs 192.168..:/nfs /nfs

配置Flink

lib中的jar包

有些可能在部署过程中不起作用,但是没有移除出去

commons-cli-1.5.0.jar             flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar  hadoop-hdfs-3.2.1.jar
flink-cep-1.15.3.jar              flink-shaded-zookeeper-3.5.9.jar                      log4j-1.2-api-2.17.1.jar
flink-connector-files-1.15.3.jar  flink-sql-connector-mysql-cdc-2.3.0.jar               log4j-api-2.17.1.jar
flink-connector-jdbc-1.15.2.jar   flink-table-api-java-uber-1.15.3.jar                  log4j-core-2.17.1.jar
flink-csv-1.15.3.jar              flink-table-planner-loader-1.15.3.jar                 log4j-slf4j-impl-2.17.1.jar
flink-dist-1.15.3.jar             flink-table-runtime-1.15.3.jar                        mysql-connector-java-5.1.49.jar
flink-json-1.15.3.jar             hadoop-client-3.2.1.jar
flink-scala_2.12-1.15.3.jar       hadoop-common-3.2.1.jar

为什么需要上面列出的jar包?

因为以下问题:

  1. FlinkSql无法访问CDC/JDBC

导入以下jar包到$flink_home/lib下
flink-sql-connector-mysql-cdc-2.3.0.jar
flink-connector-jdbc-1.15.2.jar

  1. flinksql如下报错

[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.runtime.CalciteException: Non-query expression encountered in illegal context

导入以下jar包到$flink_home/lib下
hadoop-client-3.2.1.jar(重要)
hadoop-hdfs-3.2.1.jar(这个貌似没有用,没测试过)
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar(重要)
hadoop-common-3.2.1.jar(重要)

  1. Exception in thread “main” java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;

集群启动失败
  需要导入commons-cli-1.5.0.jar(https://mvnrepository.com/artifact/commons-cli/commons-cli/1.5.0)

flink-conf.yaml

配置参考

#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: localhost
jobmanager.rpc.port: ****
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 1
#==============================================================================
# High Availability
#==============================================================================
high-availability: zookeeper
high-availability.storageDir: file:///nfs/HA
high-availability.zookeeper.quorum: node1:2181,node2:2181
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
state.checkpoints.dir: file:///nfs/checkPoint
state.savepoints.dir: file:///nfs/savePoint
jobmanager.execution.failover-strategy: region
#==============================================================================
# Rest & web frontend
#==============================================================================
rest.address: localhost
rest.bind-address: 0.0.0.0
#==============================================================================
# Advanced
#==============================================================================
classloader.resolve-order: parent-first

问题:

1. 为何有些设置项要改为 0.0.0.0 ?

配置文件中描述:To enable this, set the bind-host address to one that has access to an outside facing network interface, such as 0.0.0.0.

许多设置项默认是localhost,Standalone模式下设置为不启用,如果需要启用则改为 0.0.0.0

以下是设置项具体关联内容(在flink Standalone模式下必须设置):

  • rest-bind-address: 0.0.0.0 启用WebUI界面,启动集群后可以在主节点的UI端口访问UI界面
  • taskmanager.bind-host: 0.0.0.0 启用此功能,将绑定主机地址设置为可以访问面向外部的网络的地址
  • jobmanager.bind-host: 0.0.0.0 启用此功能,webUI界面才可显示可用的资源都正常,否则可用资源都显示为0

2. 出现这种错误案例(详细信息请点击此处)怎么办?
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler

  能启动flinksql客户端,但是flinksql insert等操作无法执行问题
配置文件中配置 classloader.resolve-order: parent-first即可解决

workers

192.168.***.***
192.168.***.***

如果配置了hosts可以写成以下形式

node1
node2
(node3)...

masters

  具备高可用需要多台master,同时只有一个JobManager工作,如果JobManager所在机器宕机,集群会启动备用节点,测试时大概需要30秒左右,高可用相关的路径必须是所有集群都能访问到的路径,否则配置无效

192.168.***.***:端口号
192.168.***.***:端口号

配置Zookeeper

根据需要按照官网参考配置

为了方便,需要配置一个启动脚本方便使用

#!/bin/bash

case $1 in
"start")
        for i in node1 node2
        do
             echo ----------zookeeper $i 启动----------
             ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
        done

;;
"stop")
        for i in node1 node2
        do
             echo ----------zookeeper $i 停止----------
             ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
        done

;;
"status")
        for i in node1 node2
        do
             echo ----------zookeeper $i 状态----------
             ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
        done

;;
esac

注意点:

  1. 需要把zookeeper的zoo.cfg文件复制到$flink_home/conf下

运行阶段

流程采用FlinkSQL执行任务

sql-client.sh

官网语法参考

官网sql客户端使用参考

FlinkSql表主键

有效性检查
  SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性
  Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。
  Notes: 在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

执行sql脚本,使用下面这个命令

sql-client.sh embedded -f [filename]
CREATE TABLE testpk_target(
  id int,
  ts timestamp(3),
  PRIMARY KEY (id) NOT ENFORCED
)WITH(
  'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/数据库名',
   'table-name' = '表名',
   'username' = '******',
   'password' = '******'
);

CREATE TABLE testpk(
  id int,
  PRIMARY KEY (id) NOT ENFORCED
)WITH(
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.***.***',
  'port' = '****',
  'username' = '******',
  'password' = '******',
  'database-name' = '数据库名',
  'table-name' = '表名',
  'debezium.snapshot.mode' = 'initial'
);

insert into testpk_target
select id,now() as ts from testpk;

状态与容错 savepoint/checkpoint 的使用

问题:

  1. 如何从checkpoint/savepoint恢复 ?

首先需要在sql-client界面启用checkpoint,或者配置文件中设置checkpoint生成时间间隔

SET execution.checkpointing.interval = 60s;

在insert等操作执行前设置如下设置

SET execution.savepoint.path = '/opt/module/flink/checkPoint/c318946573f2a1f64824782aeb8af4b4/chk-1';

一旦设置检查点路径,后续sql语句都会启用
需要重置检查点路径,使得后续的sql语句不在从检查点恢复

RESET execution.savepoint.path;
  1. 如何使用savepoint ?

savepoint介绍及其使用方法

  1. flink cancel -s与flink stop -p区别

flink cancel -s:取消任务并生成savepoint,保留checkpoint
flink stop -p:停止任务并生成savepoint,不保留checkpoint

网页端显示任务结束状态不同:
cancel显示为cancelled
stop显示finish

Watermark在FlinkSql中的使用

官网Watermark使用参考

  DataStream Api是选中一个实体类的某个属性作为Watermark的标记
FlinkSql也类似,并且提供以下三种 watermark 策略:

  • 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

  严格递增时间戳:发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到,假设10:00来了2条数据,时间戳都为10:00,第二条数据认为迟到,所以不太合理,下面这个递增时间戳稍微合理一点

  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND

  递增时间戳:发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。

  • 有界乱序时间戳(符合生产环境数据的乱序特点): WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit

  有界乱序时间戳: 发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是一个 5 秒延迟的 watermark 策略。

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

  Watermark 根据配置文件中 pipeline.auto-watermark-interval 中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。

窗口函数

Apache Flink 提供了几个窗口表值函数 (TVF) 来将表的元素划分为窗口

  当前版本为Flink1.15.3, 官网窗口函数参考

  滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。

  TUMBLE函数采用三个必需参数,一个可选参数:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

  1. data:表参数,可以是与时间属性列的任何关系。
  2. timecol:是一个列描述符,将数据的时间属性列映射到翻转窗口。
  3. size:窗口宽度。
  4. offset:可选参数,指定窗口开始偏移量。

  滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

  1. data:表参数,可以是与时间属性列的任何关系。
  2. timecol:是一个列描述符,将数据的时间属性列映射到跳跃窗口。
  3. slide:是指定顺序跳跃窗口开始之间的持续时间的持续时间
  4. size:是指定跳跃窗口宽度的持续时间。
  5. offset:是一个可选参数,用于指定窗口开始偏移的偏移量。

  累积窗口在某些情况下非常有用,例如在固定的窗口间隔内提前触发翻转窗口。例如,每日仪表板从 00:00 到每分钟绘制累积 UV,10:00 的 UV 表示从 00:00 到 10:00 的 UV 总数。通过累积窗口轻松实现。

  例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将获得每天的窗口:[00:00, 01:00),[00:00, 02:00),[00:00, 03:00),[00:00, 24:00)

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

  1. data:表参数,可以是与时间属性列的任何关系。
  2. timecol:列描述符,将数据的时间属性列映射到累积窗口。
  3. step:指定顺序累积窗口结束之间增加的窗口大小的持续时间。
  4. size:指定累积窗口宽度,必须整数倍step
  5. offset:可选参数,指定窗口开始偏移量。
  • Session Windows 会话窗口(即将支持)

  会话窗口(SESSION)通过SESSION活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,该窗口就会关闭。

以上是关于FLinkSQL+FlinkCDC的主要内容,如果未能解决你的问题,请参考以下文章

95-910-142-源码-FlinkSQL-FlinkSQL追加模式与缩进模式区别

flinksql的 / 的结果只会保留整数部分,flinksql 不支持 div运算符。hive mysql : / 结果是小数, div 结果只会保留整数部分

大数据(9h)FlinkSQL连MySQLKafka

大数据(9h)FlinkSQL连MySQLKafka

大数据(9h)FlinkSQL连MySQLKafka

FLinkSQL+FlinkCDC