手把手构建基于 GBase8s 的 Flink connector

Posted 麒思妙想

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手把手构建基于 GBase8s 的 Flink connector相关的知识,希望对你有一定的参考价值。

简介

本篇文章,首先会向大家阐述什么是 Flink connector 和 CDC , 然后会通过手把手的方式和大家一起构建一个简单的GBase8s的Flink connector,并完成实践项目,即通过mysql CDC实时通过connector同步数据到GBase8s中。

什么是 Flink connector

Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。

简单的说:flink连接器就是将某些数据源加载与数据输出做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。

什么是CDC(Change Data Capture)

首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。

其主要的应用场景:

  • 异构数据库之间的数据同步或备份 / 建立数据分析计算平台

  • 微服务之间共享数据状态

  • 更新缓存 / CQRS 的 Query 视图更新

CDC 它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。

基于查询的 CDC基于日志的 CDC
概念每次捕获变更发起 Select 查询进行全表扫描,过滤出查询之间变更的数据读取数据存储系统的 log ,例如 MySQL 里面的 binlog持续监控
开源产品Sqoop, Kafka JDBC SourceCanal, Maxwell, Debezium
执行模式BatchStreaming
捕获所有数据的变化
低延迟,不增加数据库负载
不侵入业务(LastUpdated字段)
捕获删除事件和旧记录的状态
捕获旧记录的状态

flink-connector-gbasedbt

我们其实是可以自己手写Sink将CDC的数据直接汇入我们的目标数据库的。这样是不是不够优雅?我们是不是可以通过Flink SQL的方式将数据汇入到GBase8s呢?答案是肯定的,接下来我们就来实现一个简单的GBase8s的Flink connector

  1. 构建 行转换器(RowConverter)

  2. 构建 方言(Dialect)

  3. 注册动态表工厂(DynamicTableFactory),以及相关Sink程序

经过上面三步,就可以实现一个简单的connector了。接下来我们就来看,如何实现:

构建 行转换器(RowConverter)

package wang.datahub.converter;

import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

/**
* @author lijiaqi
*/
public class GBasedbtRowConverter extends AbstractJdbcRowConverter 

   public GBasedbtRowConverter(RowType rowType) 
       super(rowType);
  

   private static final long serialVersionUID = 1L;

   @Override
   public String converterName() 
       return "gbasedbt";
  

构建 方言(Dialect)

package wang.datahub.dialect;

import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import wang.datahub.converter.GBasedbtRowConverter;

import java.util.Optional;

/**
*
* @author lijiaqi
*/
public class GBasedbtDialect implements JdbcDialect 

   private static final long serialVersionUID = 1L;

   @Override
   public String dialectName() 
       return "gbasedbt";
  

   @Override
   public boolean canHandle(String url) 
       return url.startsWith("jdbc:gbasedbt-sqli:");
  

   @Override
   public JdbcRowConverter getRowConverter(RowType rowType) 
       return new GBasedbtRowConverter(rowType);
  

   @Override
   public String getLimitClause(long l) 
       return null;
  

   @Override
   public void validate(TableSchema schema) throws ValidationException 
       JdbcDialect.super.validate(schema);
  

   @Override
   public Optional<String> defaultDriverName() 
       return Optional.of("com.gbasedbt.jdbc.Driver");
  

   @Override
   public String quoteIdentifier(String identifier) 
       return "'" + identifier + "'";
  

   @Override
   public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) 
       return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
  

   @Override
   public String getRowExistsStatement(String tableName, String[] conditionFields) 
       return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);
  

   @Override
   public String getInsertIntoStatement(String tableName, String[] fieldNames) 
       return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);
  

   @Override
   public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) 
       return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);
  

   @Override
   public String getDeleteStatement(String tableName, String[] conditionFields) 
       return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);
  

   @Override
   public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) 
       return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);
  

注册动态表工厂(DynamicTableFactory),以及相关Sink程序

首先创建 GBasedbtSinkFunction 用于接受RowData数据输入,并将其Sink到配置的数据库中

package wang.datahub.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

/**
* @author lijiaqi
*/
public class GBasedbtSinkFunction extends RichSinkFunction<RowData> 

   private static final long serialVersionUID = 1L;

   private final JdbcOptions jdbcOptions;
   private final SerializationSchema<RowData> serializationSchema = null;
   private DataType dateType;

   private Connection conn;
   private Statement stmt;

   public GBasedbtSinkFunction(JdbcOptions jdbcOptions) 
       this.jdbcOptions = jdbcOptions;
  

   public GBasedbtSinkFunction(JdbcOptions jdbcOptions, DataType dataType) 
       this.jdbcOptions = jdbcOptions;
       this.dateType = dataType;
  

   @Override
   public void open(Configuration parameters) 
       System.out.println("open connection !!!!!");
       try 
           if (null == conn) 
               Class.forName(jdbcOptions.getDriverName());
               conn = DriverManager.getConnection(jdbcOptions.getDbURL(),jdbcOptions.getUsername().orElse(null),jdbcOptions.getPassword().orElse(null));
          
       catch (Exception e) 
           e.printStackTrace();
      
  

   @Override
   public void invoke(RowData value, Context context) throws Exception 

       try 
           stmt = conn.createStatement();
           String sql = "insert into " + this.jdbcOptions.getTableName() + " values ( ";
           for (int i = 0; i < value.getArity(); i++) 
               //这里需要根据事情类型进行匹配
               if(dateType.getChildren().get(i).getConversionClass().equals(Integer.class))
                   sql += +value.getInt(i)+ " ,";
              else 
                   sql += "'"+value.getString(i) + "' ,";
              
          
           sql = sql.substring(0, sql.length() - 1);
           sql += " ); ";

           System.out.println("sql ==>" + sql);

           stmt.execute(sql);
      catch(Exception e)
           e.printStackTrace();
      
  

   @Override
   public void close() throws Exception 
       if (stmt != null) 
           stmt.close();
      
       if (conn != null) 
           conn.close();
      
  

构建 GBasedbtDynamicTableSink  

package wang.datahub.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

/**
* @author lijiaqi
*/
public class GBasedbtDynamicTableSink implements DynamicTableSink 

   private final JdbcOptions jdbcOptions;
   private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
   private final DataType dataType;

   public GBasedbtDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) 
       this.jdbcOptions = jdbcOptions;
       this.encodingFormat = encodingFormat;
       this.dataType = dataType;
  

   @Override
   public ChangelogMode getChangelogMode(ChangelogMode requestedMode) 
       return requestedMode;
  

   @Override
   public SinkRuntimeProvider getSinkRuntimeProvider(Context context) 
       System.out.println("SinkRuntimeProvider");
       System.out.println(dataType);
       GBasedbtSinkFunction gbasedbtSinkFunction = new GBasedbtSinkFunction(jdbcOptions,dataType);
       return SinkFunctionProvider.of(gbasedbtSinkFunction);
  

   @Override
   public DynamicTableSink copy() 
       return new GBasedbtDynamicTableSink(jdbcOptions, encodingFormat, dataType);
  

   @Override
   public String asSummaryString() 
       return "gbasedbt Table Sink";
  

构建GBasedbtDynamicTableFactory

package wang.datahub.table;


import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import wang.datahub.dialect.GBasedbtDialect;

import java.util.HashSet;
import java.util.Set;

/**
* @author lijiaqi
*/
public class GBasedbtDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory 

   public static final String IDENTIFIER = "gbasedbt";

   private static final String DRIVER_NAME = "com.gbasedbt.jdbc.Driver";

   public static final ConfigOption<String> URL = ConfigOptions
          .key("url")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc database url.");

   public static final ConfigOption<String> DRIVER = ConfigOptions
          .key("driver")
          .stringType()
          .defaultValue(DRIVER_NAME)
          .withDescription("the jdbc driver.");

   public static final ConfigOption<String> TABLE_NAME = ConfigOptions
          .key("table-name")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc table name.");

   public static final ConfigOption<String> USERNAME = ConfigOptions
          .key("username")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc user name.");

   public static final ConfigOption<String> PASSWORD = ConfigOptions
          .key("password")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc password.");

//   public static final ConfigOption<String> FORMAT = ConfigOptions
//           .key("format")
//           .stringType()
//           .noDefaultValue()
//           .withDescription("the format.");

   @Override
   public String factoryIdentifier() 
       return IDENTIFIER;
  

   @Override
   public Set<ConfigOption<?>> requiredOptions() 
       Set<ConfigOption<?>> requiredOptions = new HashSet<>();
       requiredOptions.add(URL);
       requiredOptions.add(TABLE_NAME);
       requiredOptions.add(USERNAME);
       requiredOptions.add(PASSWORD);
//       requiredOptions.add(FORMAT);
       return requiredOptions;
  

   @Override
   public Set<ConfigOption<?>> optionalOptions() 
       return new HashSet<>();
  

   @Override
   public DynamicTableSource createDynamicTableSource(Context context) 
       
       final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

       final ReadableConfig config = helper.getOptions();
       
       helper.validate();

       JdbcOptions jdbcOptions = getJdbcOptions(config);
       
       TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

       return new GBasedbtDynamicTableSource(jdbcOptions, physicalSchema);

  

   @Override
   public DynamicTableSink createDynamicTableSink(Context context) 
       
       final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

//       final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
//               SerializationFormatFactory.class,
//               FactoryUtil.FORMAT);

       final ReadableConfig config = helper.getOptions();

       helper.validate();

       JdbcOptions jdbcOptions = getJdbcOptions(config);

       final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

       return new GBasedbtDynamicTableSink(jdbcOptions, null, dataType);
  

   private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) 
       final String url = readableConfig.get(URL);
       final JdbcOptions.Builder builder = JdbcOptions.builder()
              .setDriverName(DRIVER_NAME)
              .setDBUrl(url)
              .setTableName(readableConfig.get(TABLE_NAME))
              .setDialect(new GBasedbtDialect());

       readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
       readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
       return builder.build();
  

接下来通过SPI注册动态表:创建文件resources\\META-INF\\services\\org.apache.flink.table.factories.Factory内容注册为wang.datahub.table.GBasedbtDynamicTableFactory

至此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。

实战项目

下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 gbase8s 里

接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。

创建数据源表

       // 数据源表
       String sourceDDL =
               "CREATE TABLE mysql_binlog (\\n" +
                       " id INT NOT NULL,\\n" +
                       " name STRING,\\n" +
                       " description STRING\\n" +
                       ") WITH (\\n" +
                       " 'connector' = 'mysql-cdc',\\n" +
                       " 'hostname' = 'localhost',\\n" +
                       " 'port' = '3306',\\n" +
                       " 'username' = 'flinkcdc',\\n" +
                       " 'password' = '123456',\\n" +
                       " 'database-name' = 'test',\\n" +
                       " 'table-name' = 'test_cdc'\\n" +
                       ")";

创建输出表,输出到GBase8s ,这里 connector设置成gbasedbt

       String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
       String userName = "gbasedbt";
       String password = "123456";
       String gbasedbtSinkTable = "ta";
       // 输出目标表
       String sinkDDL =
               "CREATE TABLE test_cdc_sink (\\n" +
                       " id INT NOT NULL,\\n" +
                       " name STRING,\\n" +
                       " description STRING,\\n" +
                       " PRIMARY KEY (id) NOT ENFORCED \\n " +
                       ") WITH (\\n" +
                       " 'connector' = 'gbasedbt',\\n" +
//                       " 'driver' = 'com.gbasedbt.jdbc.Driver',\\n" +
                       " 'url' = '" + url + "',\\n" +
                       " 'username' = '" + userName + "',\\n" +
                       " 'password' = '" + password + "',\\n" +
                       " 'table-name' = '" + gbasedbtSinkTable + "' \\n" +
                       ")";

这里我们直接将数据汇入

       String transformSQL =
               "insert into test_cdc_sink select * from mysql_binlog";

完整参考代码

package wang.datahub.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MysqlToGBasedbtlMain 
   public static void main(String[] args) throws Exception 
       EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inStreamingMode()
              .build();
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);



       tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);


       // 数据源表
       String sourceDDL =
               "CREATE TABLE mysql_binlog (\\n" +
                       " id INT NOT NULL,\\n" +
                       " name STRING,\\n" +
                       " description STRING\\n" +
                       ") WITH (\\n" +
                       " 'connector' = 'mysql-cdc',\\n" +
                       " 'hostname' = 'localhost',\\n" +
                       " 'port' = '3306',\\n" +
                       " 'username' = 'flinkcdc',\\n" +
                       " 'password' = '123456',\\n" +
                       " 'database-name' = 'test',\\n" +
                       " 'table-name' = 'test_cdc'\\n" +
                       ")";


       String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
       String userName = "gbasedbt";
       String password = "123456";
       String gbasedbtSinkTable = "ta";
       // 输出目标表
       String sinkDDL =
               "CREATE TABLE test_cdc_sink (\\n" +
                       " id INT NOT NULL,\\n" +
                       " name STRING,\\n" +
                       " description STRING,\\n" +
                       " PRIMARY KEY (id) NOT ENFORCED \\n " +
                       ") WITH (\\n" +
                       " 'connector' = 'gbasedbt',\\n" +
//                       " 'driver' = 'com.gbasedbt.jdbc.Driver',\\n" +
                       " 'url' = '" + url + "',\\n" +
                       " 'username' = '" + userName + "',\\n" +
                       " 'password' = '" + password + "',\\n" +
                       " 'table-name' = '" + gbasedbtSinkTable + "' \\n" +
                       ")";

       String transformSQL =
               "insert into test_cdc_sink select * from mysql_binlog";

       tableEnv.executeSql(sourceDDL);
       tableEnv.executeSql(sinkDDL);
       TableResult result = tableEnv.executeSql(transformSQL);

       result.print();
       env.execute("sync-flink-cdc");
  

运行结果

查看数据,已经录入进数据库里

参考链接:

https://blog.csdn.net/zhangjun5965/article/details/107605396

https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773

https://segmentfault.com/a/1190000039662261

https://www.cnblogs.com/weijiqian/p/13994870.html

https://blog.csdn.net/dafei1288/article/details/118192917

以上是关于手把手构建基于 GBase8s 的 Flink connector的主要内容,如果未能解决你的问题,请参考以下文章

Springboot + Openjpa 整合 GBase8s 实践

Springboot + Openjpa 整合 GBase8s 实践

Springboot + Openjpa 整合 GBase8s 实践

基于GBase8s和Calcite的多数据源查询

基于GBase8s和Calcite的多数据源查询

实战手把手 | Flink安装指南