flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp
Posted roman_日积跬步-终至千里
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp相关的知识,希望对你有一定的参考价值。
文章目录
一、问题概述
由于mysql 驱动版本更新,导致flink在消费mysql数据时不能正确将datetime类型数据转换为flink的Timestamp。
通过降低mysql驱动到8.0.18自测发现问题不再出现
但低版本mysql驱动会有漏洞扫描问题,故此方式不能采取,进而考虑修改源码。
使用到的flink sql 如下:
CREATE TABLE `test_gao_0519_02` (
`birthday` TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
'username' = 'root',
'password' = '11111111',
'table-name' = 'auto_mysql22'
);
CREATE TABLE `test_gao` (
`birthday` TIMESTAMP
) WITH (
'connector' = 'print'
);
INSERT INTO `test_gao`
(SELECT `birthday`
FROM `test_gao_0519_02`);
二、问题分析与解决
1. 源码分析和模型简化
1.1. 简化并解决类型转换问题
Flink jdbc connector对于类型转换逻辑主要在AbstractJdbcRowConverter类实现,通过debug定位到类型转换的错误为,如下截图位置
通过debug分析是因为mysql-connector-java-8.0.28对于mysql的datetime类型识别为localdatetime,使得数据强转Timestamp类型失败,那既然这样可以将问题简化为:
通过JDBC消费mysql中datetime类型识别为localdatetime,而不是Timestamp
测试jdbc程序
@Log4j2
public class FirstExample
// JDBC driver name and database URL
static final String DB_URL = "jdbc:mysql://localhost:3306/dataflow_test";
// Database credentials
static final String USER = "root";
static final String PASS = "11111111";
public static void main(String[] args)
Connection conn = null;
PreparedStatement stmt1 = null;
PreparedStatement stmt2 = null;
try
//STEP 2: Register JDBC driver
Class.forName("com.mysql.cj.jdbc.Driver");
//STEP 3: Open a connection
log.info("Connecting to database...");
conn = DriverManager.getConnection(DB_URL, USER, PASS);
//STEP 4: Execute a query
log.info("Creating statement...");
String sql = "SELECT * FROM dataflow_test.auto_mysql22";
stmt1 = conn.prepareStatement(sql);
ResultSet resultSet = stmt1.executeQuery();
//STEP 5: Extract data from result set
while (resultSet.next())
//Retrieve by column name
Date birthday = resultSet.getDate("birthday");
Object object = resultSet.getObject(3);
LocalDateTime object1 = (LocalDateTime) object;
long time = Timestamp.valueOf(object1).getTime();
System.out.println(time);
//Display values
log.info("birthday: ", birthday);
//STEP 6: Clean-up environment
。。。
看到数据能够通过JDBC消费成功,那到此就解决第一步:数据类型的转换。类型转换的关键代码是:
Date birthday = resultSet.getDate("birthday");
Object object = resultSet.getObject(3);
LocalDateTime object1 = (LocalDateTime) object;
long time = Timestamp.valueOf(object1).getTime();
2. 源码改造
解决了前面一小步数据类型的转换,下面尝试修改源码。
思路很简单,就是在AbstractJdbcRowConverter类中对于timestamp这个类型下的数据类型转换逻辑下进行适配,添加以下代码
return new JdbcDeserializationConverter()
@Override
public Object deserialize(Object jdbcField) throws SQLException
if (jdbcField instanceof LocalDateTime)
return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) jdbcField));
return TimestampData.fromTimestamp((Timestamp) jdbcField);
;
需要注意的两点:
1.修改的程序不能影响原有源码执行逻辑
2.能够预想到修改完逻辑会影响的范围
2.1. 修改尝试
适配完逻辑之后,适配发现AbstractJdbcRowConverter似乎并不能兼容我对源码的修改,测试发现直接报类似类型转换错误。。。
观察到AbstractJdbcRowConverter是一个abstract类,那继续尝试在MySQLRowConverter,完善类型转换逻辑。
2.2. 正解
package org.apache.flink.connector.jdbc.internal.converter;
。。。
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* MySQL.
*/
public class MySQLRowConverter extends AbstractJdbcRowConverter
private static final long serialVersionUID = 1L;
@Override
public String converterName()
return "MySQL";
public MySQLRowConverter(RowType rowType)
super(rowType);
@Override
protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type)
return wrapIntoNullableInternalConverter(createInternalConverter(type));
/**
* 重新实现JdbcDeserializationConverter实例:添加处理 数据为null时的处理逻辑。
*
* @param jdbcDeserializationConverter
* @return
*/
@Override
protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
JdbcDeserializationConverter jdbcDeserializationConverter)
return val ->
if (val == null)
return null;
else
return jdbcDeserializationConverter.deserialize(val);
;
/**
* 用于 jdbc 数据 转为 row type 的实例
* <p>
* 用于识别并处理mysql自己的数据类型
*
* @param type
* @return
*/
@Override
protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
switch (type.getTypeRoot())
case NULL:
return val -> null;
case BOOLEAN:
case FLOAT:
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
return val -> val;
case TINYINT:
return val -> ((Integer) val).byteValue();
case SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
case INTEGER:
return val -> val;
case BIGINT:
return val -> val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case DATE:
return val -> (int) (((Date) val).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return new JdbcDeserializationConverter()
@Override
public Object deserialize(Object jdbcField) throws SQLException
if (jdbcField instanceof LocalDateTime)
return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) jdbcField));
return TimestampData.fromTimestamp((Timestamp) jdbcField);
;
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case BINARY:
case VARBINARY:
return val -> (byte[]) val;
case ARRAY:
case ROW:
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
再次测试发现:数据消费成功了!
三、总结-bug自测收尾
类型转换并消费成功后,还需要考虑是否能再次将数据写入到数据库中,测试:
CREATE TABLE `test_gao_0519_02` (
`birthday` TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
'username' = 'root',
'password' = '11111111',
'table-name' = 'auto_mysql22'
);
CREATE TABLE `test_gao` (
`birthday` TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
'username' = 'root',
'password' = '11111111',
'table-name' = 'auto_mysql22'
);
INSERT INTO `test_gao`
(SELECT `birthday`
FROM `test_gao_0519_02`);
观察能消费,并能写入到数据库:
到此问题解决。
以上是关于flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp的主要内容,如果未能解决你的问题,请参考以下文章
将通过 JDBC 检索到的 java.sql.Date 值转换为 java.time.LocalDate? [复制]
无法将 java.lang.String 类型的属性值转换为所需的 java.time.LocalDateTime 类型
未找到类型的即时转换器:java.time.ZonedDateTime
Flink JDBC Connector:Flink 与数据库集成最佳实践