使用Binlog+FlinkCDC实时监控数据

Posted 追风少年zzq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Binlog+FlinkCDC实时监控数据相关的知识,希望对你有一定的参考价值。

一、mysql的Binlog

1、什么是Binlog

1)binlog是二进制日志,并且是事务安全性

2)binlog记录了所有的DDL和DML(除了数据查询语句)语句,并以事件的形式记录,还包含语句所执行的消耗的时间

3)一般来说开启二进制日志大概会有1%的性能损耗。

2、Binlog使用场景

1)使用binlog恢复数据

2)在项目中动态监听mysql中变化的数据

3、Binlog开启

1)在MySQL的配置文件(Linux: /etc/my.cnf , Windows:\\my.ini)下,修改配置在[mysqld] 区块设置/添加

server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2019
binlog-do-db=gmall2020
binlog-do-db=gmall2021

2)重启mysql

sudo systemctl restart mysqld

4、配置文件参数解析

配置机器id

多台机器不能重复

server-id=1

开启binlog

log-bin=mysql-bin

Binlog分类设置

MySQL Binlog的格式,那就是有三种,分别是STATEMENT,MIXED,ROW。

在配置文件中选择配置,一般会配置为row

binlog_format=row

三种分类的区别:

1)statement

​ 语句级,binlog会记录每次一执行写操作的语句。

​ 相对row模式节省空间,但是可能产生不一致性,比如

​ update tt set create_date=now()

​ 如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。

​ 优点:节省空间

​ 缺点:有可能造成数据不一致。

2)row(常用)

​ 行级,binlog会记录每次操作后每行记录的变化。

​ 优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。

​ 缺点:占用较大空间。

3)mixed

​ statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题

    在某些情况下譬如:

​ 当函数中包含 UUID() 时;

    包含 AUTO_INCREMENT 字段的表被更新时;

    执行 INSERT DELAYED 语句时;

    用 UDF 时;

    会按照 ROW的方式进行处理

​ 优点:节省空间,同时兼顾了一定的一致性。

​ 缺点:还有些极个别情况依旧会造成不一致,

​ 另外statement和mixed对于需要对binlog的监控的情况都不方便。

设置数据库

设置要监听的数据库,可以同时写入多个库

binlog-do-db=gmall2021
binlog-do-db=gmall2022
binlog-do-db=gmall2023

二、FlinkCDC

1、什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2、CDC的种类

CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:

基于查询的CDC基于Binlog的CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3、FlinkCDC

Flink内置了Debezium

FlinkCDC1.11版本正式发布

Canal不支持读取全量binlog数据,而FlinkCDC完美避开了这个问题

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/...

3.CDC案例实操

1)导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.2.0</version>
    </dependency>

<dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2)编写代码

package com.haoziqi;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**************************************************************
 * @Author: haoziqi
 * @Date: Created in 9:27 2021/3/15
 * @Description: TODO 使用DataStream连接mysql,并监控表中新增的数据  测试通道是否正常:flink读取mysql  binlog数据
 * 执行的时候需要检查对应的库是否存在
 *   linux中:sudo vim /etc/my.cnf
 *   2、执行的时候需要运行hdfs
 *   3、启动mysql,
 *
 **************************************************************/
public class FlinkCDC1 {
    private static Properties properties;

    public static void main(String[] args) throws Exception {
        //TODO 1.获取流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.1Checkpoint相关
/*读取的是binlog中的数据,如果集群挂掉,尽量能实现断点续传功能。如果从最新的读取(丢数据)。如果从最开始读(重复数据)。理想状态:读取binlog中的数据读一行,保存一次读取到的(读取到的行)位置信息。而flink中读取行位置信息保存在Checkpoint中。使用Checkpoint可以把flink中读取(按行)的位置信息保存在Checkpoint中*/
        env.enableCheckpointing(5000L);//5s执行一次Checkpoint
        //设置Checkpoint的模式:精准一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //任务挂掉的时候是否清理checkpoint。使任务正常退出时不删除CK内容,有助于任务恢复。默认的是取消的时候清空checkpoint中的数据。RETAIN_ON_CANCELLATION表示取消任务的时候,保存最后一次的checkpoint。便于任务的重启和恢复,正常情况下都使用RETAIN
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //设置一个重启策略:默认的固定延时重启次数,重启的次数是Integer的最大值,重启的间隔是1s
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        //设置一个状态后端 jobManager。如果使用的yarn集群模式,jobManager随着任务的生成而生成,任务挂了jobManager就没了。因此需要启动一个状态后端。只要设置checkpoint,尽量就设置一个状态后端。保存在各个节点都能读取的位置:hdfs中
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/ck/"));
        //指定用户
        System.setProperty("HADOOP_USER_NAME", "atguigu");



        //TODO 2.读取mysql变化数据 监控MySQL中变化的数据
        Properties properties = new Properties(); //创建一个变量可以添加之后想添加的配置信息
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder() //使用builder创建MySQLsource对象,需要指定对象的泛型。
                .hostname("hadoop102") //指定监控的哪台服务器(MySQL安装的位置)
                .port(3306) //MySQL连接的端口号
                .username("root") //用户
                .password("123456")//密码
                .databaseList("gmall_flink_0923") //list:可以监控多个库
                .tableList("gmall_flink_0923.z_user_info") //如果不写则监控库下的所有表,需要使用【库名.表名】
                //debezium中有很多配置信息。可以创建一个对象来接收
                //.debeziumProperties(properties)
                .deserializer(new StringDebeziumDeserializationSchema()) //读的数据是binlog文件,反序列化器,解析数据
                .startupOptions(StartupOptions.initial()) //初始化数据:空值读不读数据库中的历史数据。initial(历史+连接之后的)、latest-offset(连接之后的)。timestamp(根据指定时间戳作为开始读取的位置)
                .build();

        DataStreamSource<String> streamSource = env.addSource(sourceFunction);

        //TODO 3.打印数据
        streamSource.print();
        //把上面代码注释掉,报错代码

        SingleOutputStreamOperator<String> map = streamSource.map(data -> data);
        SingleOutputStreamOperator<String> slotgroup = map.slotSharingGroup("123");

        slotgroup.print();



        //TODO 4.启动任务

        env.execute();
    }
}

3) 案例测试:

1)打包成带依赖的jar包

2)开启MySQLbinlog并重启Mysql

4) 启动HDFS集群+yarn

start-yarn.sh
start-dfs.sh

5)启动程序(基于yarn的pre-job模式)

bin/flink run -t yarn-per-job -c com.haoziqi.FlinkCDC1 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

6)在MySQL的gmall-flink.z_user_info表中添加、修改或者删除数据

7)在控制台查看输出

4)CDC数据格式转换(必看)

经过上面的数据采集,我们得到一份SourceRecord格式的数据

SourceRecord{
    sourcePartition={server=mysql_binlog_source},
    sourceOffset={ts_sec=1616030398, file=mysql-bin.000009, pos=519, row=1, server_id=1, event=2}
    }
ConnectRecord{
    topic=\'mysql_binlog_source.gmall_flink_0923.z_user_info\', kafkaPartition=null, key=Struct{id=8}, 

    keySchema=Schema{
    mysql_binlog_source.gmall_flink_0923.z_user_info.Key:STRUCT
    }, 

value=Struct{
        before=Struct{id=8,name=haoziqi},
        after=Struct{id=8,name=haoziqi,phone_num=123456},
        source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1616030398000,db=gmall_flink_0923,table=z_user_info,server_id=1,file=mysql-bin.000009,pos=675,row=0,thread=2
        },op=u,ts_ms=1616030399282
        },
    valueSchema=Schema{
        mysql_binlog_source.gmall_flink_0923.z_user_info.Envelope:STRUCT
        
        },
     timestamp=null, headers=ConnectHeaders(headers=)
}

在上面获取到的数据中,我们只需要获取到更新后的数据,可以使用如下代码对数据进行筛选

更新后的数据在value的Struct中被标记为after

package com.haoziqi.app.func;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
 * description
 * created by A on 2021/3/15
 */
//实现DebeziumDeserializationSchema接口并定义输出数据的类型
public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {
   @Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
    //定义JSON对象用于存放反序列化后的数据
 JSONObject result = new JSONObject();
    //获取库名和表名
 String topic = sourceRecord.topic();
    String[] split = topic.split(".");
    String database = split[1];
    String table = split[2];
    //获取操作类型
 Envelope.Operation operation = Envelope.operationFor(sourceRecord);
    //获取数据本身
 Struct struct = (Struct) sourceRecord.value();
    Struct after = struct.getStruct("after");
    JSONObject value = new JSONObject();
    if (after != null) {
        Schema schema = after.schema();
        for (Field field : schema.fields()) {
            value.put(field.name(), after.get(field.name()));
        }
    }
    //将数据放入JSON对象
 result.put("database", database);
    result.put("table", table);
    String type = operation.toString().toLowerCase();
    if ("create".equals(type)) {
        type = "insert";
    }
    result.put("type", type);
    result.put("data", value);
    //将数据传输出去
 collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
    return TypeInformation.of(String.class);
}
}

写好格式转换类后,在构建Flink对象时设置.deserializer参数即可

  DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall_flink_0923")
                .deserializer(new MyDeserializationSchemaFunction())
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> mySqlDS = env.addSource(sourceFunction);

至此,我们已成功的将CDC采集到的SourceRecord格式转换为了JSON字符串

以上是关于使用Binlog+FlinkCDC实时监控数据的主要内容,如果未能解决你的问题,请参考以下文章

实时数仓与离线数仓架构对比Flink消费流程

FlinkCDC

大数据(9j)FlinkCDC

大数据(9j)FlinkCDC

Flink系列之:基于Flink CDC2.0实现海量数据的实时同步和转换

Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive