flink jdbc(mysql)修改为阿里云AnalyticDB方言

Posted 骑电动车的小黄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink jdbc(mysql)修改为阿里云AnalyticDB方言相关的知识,希望对你有一定的参考价值。

前言

阿里云 AnalyticDB(mysql 以下简称 adb数据库,是可以直接使用 Mysql的连接方式,但 adbmysql语法并不完全相同,有一些特殊的语法有些出入,比如我在使用 upset的时候,当我们在插入表时定义了主键,如果主键已经存在并不会继续插入数据而是会去修改之前的数据。

业务场景

使用flink实时流计算从 kafka 获取数据插入到 adb数据库,但是遇到一个问题,建表的时候定义主键然后插入重复的数据应该是修改并不是插入,并且在使用**(upset)** 的时候报错:cannot update pk column SHOP_CODE to expr,当数据库为mysql的时候就没问题,查询了一下相关资料,原来adbupset语法和mysql的语法有一些出入所以这里才会报错。

当前使用的flink版本为 1.13.5,下载源码查看mysql的upset是如何实现 ,github源码下载 Release release-1.13.5 · apache/flink · GitHub 把源码导入到idea中
导入 idea后所有组件都在, maven依赖也是需要很长的时间,中间可能也会有一些包下载失败,我的处理办法是maven直接使用默认的镜像不使用阿里云镜像,但这样操作的前提是电脑开启了科学上网才行。
依赖加载成功后找到jdbc

flink JDBC源码

通过查看源码发现 Flink将onnector接口注册在org.apache.flink.connector.jdbc.dialect.JdbcDialects 中:

同时每个方言都扩展了 AbstractDialect,并且通过查看发现 MySQLDialect自己重写了 getUpsertStatement实现 upsert

自定义adb方言实现upset思路

adb使用的驱动和方言都和 mysql一致,我们只需要重写 getUpsertStatement方法实现 adbupset即可,直接复制一份 MySQLDialect,然后把 canHandle的返回由" jdbc:mysql:"修改为" jdbc:adb:",再修改 getUpsertStatement方法,把 insert 替换为 replace

具体代码实现

修改 AdbDialect 类中两个方法

@OverridepublicbooleancanHandle(String url) 
    return url.startsWith("jdbc:adb:");


@OverridepublicOptional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) 
    returnOptional.of(
            getInsertIntoStatement(tableName, fieldNames).replace("INSERT", "REPLACE"));
org.apache.flink.connector.jdbc.internal.converter包中复制一份 MySQLRowConverter改为 AdbRowConverter,把 converterName修改为 " adb" 。
public classAdbRowConverterextendsAbstractJdbcRowConverter

    private static final long serialVersionUID = 1L;

    @Override
    public String converterName() 
        return"adb";
    

    public AdbRowConverter(RowType rowType) 
        super(rowType);
    
最后将 AdbDialect注册到 JdbcDialects
publicfinalclassJdbcDialects 

    privatestaticfinal List<JdbcDialect> DIALECTS =
            Arrays.asList(
                    newDerbyDialect(),
                    newMySQLDialect(),
                    newPostgresDialect(),
                    newAdbDialect());

    /** Fetch the JdbcDialect class corresponding to a given database url. */publicstatic Optional<JdbcDialect> get(String url)
        for (JdbcDialect dialect : DIALECTS) 
            if (dialect.canHandle(url)) 
                return Optional.of(dialect);
            
        
        return Optional.empty();
    
最后还需要修改 org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider 中的 getOrEstablishConnection 方法,当加载驱动的时候传入 jdbc:adb 改为 jdbc:mysql 让它去获取 mysql 的驱动
@Overridepublic Connection getOrEstablishConnection()throws SQLException, ClassNotFoundException 
    if (connection != null) 
        return connection;
    
    if (jdbcOptions.getDriverName() == null) 
        connection =
                DriverManager.getConnection(
                        jdbcOptions.getDbURL(),
                        jdbcOptions.getUsername().orElse(null),
                        jdbcOptions.getPassword().orElse(null));
     else 
        Driverdriver= getLoadedDriver();
        Propertiesinfo=newProperties();
        jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
        jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
        StringdbURL= jdbcOptions.getDbURL();
        if (jdbcOptions.getDbURL().substring(0, 8).equals("jdbc:adb")) 
            dbURL = "jdbc:mysql" + jdbcOptions.getDbURL().substring(8);
        
        connection = driver.connect(dbURL, info);
        if (connection == null) 
            // Throw same exception as DriverManager.getConnection when no driver found to match// caller expectation.thrownewSQLException(
                    "No suitable driver found for " + jdbcOptions.getDbURL() + "08001");
        
    
    return connection;

连接方式

还需要注意一个问题,就是我们修改了这些在使用的时候这样进行链接处理,但是我们并没有 adb的驱动包,要做的就是当我们使用 adb的时候让它去加载 mysql的驱动。把 jdbc后面 mysql修改为 adb,具体连接方式:
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:adb://127.0.0.1:3306/text',
    'username' = '',
    'password' = '',
    'driver'     = 'com.mysql.jdbc.Driver',
    'table-name' = 'sys_dict'
);

可能出现的问题

如果打包的 idea使用的 jdk11flink服务器上运行的是 jdk 1.8 ,那么就会报错,我们要么就升级服务器版本,要么就在 idea中更改项目版本,点击 maven -> Profiles,把 jdk11去掉,选中 jdk1.8然后打包就可以解决问题了

以上是关于flink jdbc(mysql)修改为阿里云AnalyticDB方言的主要内容,如果未能解决你的问题,请参考以下文章

阿里云服务器 Ubuntu Mysql数据库无法连接

阿里云服务器 Ubuntu Mysql数据库无法连接

阿里云服务器 Ubuntu Mysql数据库无法连接

flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp

阿里云EMR实战篇以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤

远程连接阿里云服务器上的MySQL