flink jdbc(mysql)修改为阿里云AnalyticDB方言
Posted 骑电动车的小黄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink jdbc(mysql)修改为阿里云AnalyticDB方言相关的知识,希望对你有一定的参考价值。
前言
阿里云 AnalyticDB(mysql) 以下简称 adb数据库,是可以直接使用 Mysql的连接方式,但 adb与 mysql语法并不完全相同,有一些特殊的语法有些出入,比如我在使用 upset的时候,当我们在插入表时定义了主键,如果主键已经存在并不会继续插入数据而是会去修改之前的数据。
业务场景
使用flink实时流计算从 kafka 获取数据插入到 adb数据库,但是遇到一个问题,建表的时候定义主键然后插入重复的数据应该是修改并不是插入,并且在使用**(upset)** 的时候报错:cannot update pk column SHOP_CODE to expr,当数据库为mysql的时候就没问题,查询了一下相关资料,原来adb的upset语法和mysql的语法有一些出入所以这里才会报错。
当前使用的flink版本为 1.13.5,下载源码查看mysql的upset是如何实现 ,github源码下载 Release release-1.13.5 · apache/flink · GitHub 把源码导入到idea中
导入 idea后所有组件都在, maven依赖也是需要很长的时间,中间可能也会有一些包下载失败,我的处理办法是maven直接使用默认的镜像不使用阿里云镜像,但这样操作的前提是电脑开启了科学上网才行。
依赖加载成功后找到jdbc
flink JDBC源码
通过查看源码发现 Flink将onnector接口注册在org.apache.flink.connector.jdbc.dialect.JdbcDialects 中:
同时每个方言都扩展了 AbstractDialect,并且通过查看发现 MySQLDialect自己重写了 getUpsertStatement实现 upsert。
自定义adb方言实现upset思路
adb使用的驱动和方言都和 mysql一致,我们只需要重写 getUpsertStatement方法实现 adb的 upset即可,直接复制一份 MySQLDialect,然后把 canHandle的返回由" jdbc:mysql:"修改为" jdbc:adb:",再修改 getUpsertStatement方法,把 insert 替换为 replace
具体代码实现
修改 AdbDialect 类中两个方法
@OverridepublicbooleancanHandle(String url)
return url.startsWith("jdbc:adb:");
@OverridepublicOptional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields)
returnOptional.of(
getInsertIntoStatement(tableName, fieldNames).replace("INSERT", "REPLACE"));
在 org.apache.flink.connector.jdbc.internal.converter包中复制一份 MySQLRowConverter改为 AdbRowConverter,把 converterName修改为 " adb" 。
public classAdbRowConverterextendsAbstractJdbcRowConverter
private static final long serialVersionUID = 1L;
@Override
public String converterName()
return"adb";
public AdbRowConverter(RowType rowType)
super(rowType);
最后将 AdbDialect注册到 JdbcDialects 中
publicfinalclassJdbcDialects
privatestaticfinal List<JdbcDialect> DIALECTS =
Arrays.asList(
newDerbyDialect(),
newMySQLDialect(),
newPostgresDialect(),
newAdbDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */publicstatic Optional<JdbcDialect> get(String url)
for (JdbcDialect dialect : DIALECTS)
if (dialect.canHandle(url))
return Optional.of(dialect);
return Optional.empty();
最后还需要修改 org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider 中的 getOrEstablishConnection 方法,当加载驱动的时候传入 jdbc:adb 改为 jdbc:mysql 让它去获取 mysql 的驱动
@Overridepublic Connection getOrEstablishConnection()throws SQLException, ClassNotFoundException
if (connection != null)
return connection;
if (jdbcOptions.getDriverName() == null)
connection =
DriverManager.getConnection(
jdbcOptions.getDbURL(),
jdbcOptions.getUsername().orElse(null),
jdbcOptions.getPassword().orElse(null));
else
Driverdriver= getLoadedDriver();
Propertiesinfo=newProperties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
StringdbURL= jdbcOptions.getDbURL();
if (jdbcOptions.getDbURL().substring(0, 8).equals("jdbc:adb"))
dbURL = "jdbc:mysql" + jdbcOptions.getDbURL().substring(8);
connection = driver.connect(dbURL, info);
if (connection == null)
// Throw same exception as DriverManager.getConnection when no driver found to match// caller expectation.thrownewSQLException(
"No suitable driver found for " + jdbcOptions.getDbURL() + "08001");
return connection;
连接方式
还需要注意一个问题,就是我们修改了这些在使用的时候这样进行链接处理,但是我们并没有 adb的驱动包,要做的就是当我们使用 adb的时候让它去加载 mysql的驱动。把 jdbc后面 mysql修改为 adb,具体连接方式:
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:adb://127.0.0.1:3306/text',
'username' = '',
'password' = '',
'driver' = 'com.mysql.jdbc.Driver',
'table-name' = 'sys_dict'
);
可能出现的问题
如果打包的 idea使用的 jdk11, flink服务器上运行的是 jdk 1.8 ,那么就会报错,我们要么就升级服务器版本,要么就在 idea中更改项目版本,点击 maven -> Profiles,把 jdk11去掉,选中 jdk1.8然后打包就可以解决问题了
以上是关于flink jdbc(mysql)修改为阿里云AnalyticDB方言的主要内容,如果未能解决你的问题,请参考以下文章
flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp