flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp

Posted roman_日积跬步-终至千里

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp相关的知识,希望对你有一定的参考价值。

文章目录

一、问题概述

由于mysql 驱动版本更新,导致flink在消费mysql数据时不能正确将datetime类型数据转换为flink的Timestamp。

通过降低mysql驱动到8.0.18自测发现问题不再出现

但低版本mysql驱动会有漏洞扫描问题,故此方式不能采取,进而考虑修改源码。

使用到的flink sql 如下:

CREATE TABLE `test_gao_0519_02` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
   'username' = 'root',
   'password' = '11111111',
   'table-name' = 'auto_mysql22'
 );

CREATE TABLE `test_gao` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'print'
 );

INSERT INTO `test_gao`
 (SELECT `birthday`
 FROM `test_gao_0519_02`);

二、问题分析与解决

1. 源码分析和模型简化

1.1. 简化并解决类型转换问题

Flink jdbc connector对于类型转换逻辑主要在AbstractJdbcRowConverter类实现,通过debug定位到类型转换的错误为,如下截图位置

通过debug分析是因为mysql-connector-java-8.0.28对于mysql的datetime类型识别为localdatetime,使得数据强转Timestamp类型失败,那既然这样可以将问题简化为:

通过JDBC消费mysql中datetime类型识别为localdatetime,而不是Timestamp

测试jdbc程序

@Log4j2
public class FirstExample 
    // JDBC driver name and database URL
    static final String DB_URL = "jdbc:mysql://localhost:3306/dataflow_test";
    //  Database credentials
    static final String USER = "root";
    static final String PASS = "11111111";

    public static void main(String[] args) 
        Connection conn = null;
        PreparedStatement stmt1 = null;
        PreparedStatement stmt2 = null;
        try 
            //STEP 2: Register JDBC driver
            Class.forName("com.mysql.cj.jdbc.Driver");
            //STEP 3: Open a connection
            log.info("Connecting to database...");
            conn = DriverManager.getConnection(DB_URL, USER, PASS);

            //STEP 4: Execute a query
            log.info("Creating statement...");
            String sql = "SELECT * FROM dataflow_test.auto_mysql22";
            stmt1 = conn.prepareStatement(sql);
            ResultSet resultSet = stmt1.executeQuery();


            //STEP 5: Extract data from result set
            while (resultSet.next()) 
                //Retrieve by column name
                Date birthday = resultSet.getDate("birthday");
                Object object = resultSet.getObject(3);
                LocalDateTime object1 = (LocalDateTime) object;
                long time = Timestamp.valueOf(object1).getTime();
                System.out.println(time);

                //Display values
                log.info("birthday: ", birthday);
            
            //STEP 6: Clean-up environment
            。。。

看到数据能够通过JDBC消费成功,那到此就解决第一步:数据类型的转换。类型转换的关键代码是:

Date birthday = resultSet.getDate("birthday");
Object object = resultSet.getObject(3);
LocalDateTime object1 = (LocalDateTime) object;
long time = Timestamp.valueOf(object1).getTime();

2. 源码改造

解决了前面一小步数据类型的转换,下面尝试修改源码。
思路很简单,就是在AbstractJdbcRowConverter类中对于timestamp这个类型下的数据类型转换逻辑下进行适配,添加以下代码

return new JdbcDeserializationConverter() 
                    @Override
                    public Object deserialize(Object jdbcField) throws SQLException 
                        if (jdbcField instanceof LocalDateTime) 
                            return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) jdbcField));
                        
                        return TimestampData.fromTimestamp((Timestamp) jdbcField);
                    
                ;

需要注意的两点:
1.修改的程序不能影响原有源码执行逻辑
2.能够预想到修改完逻辑会影响的范围

2.1. 修改尝试

适配完逻辑之后,适配发现AbstractJdbcRowConverter似乎并不能兼容我对源码的修改,测试发现直接报类似类型转换错误。。。

观察到AbstractJdbcRowConverter是一个abstract类,那继续尝试在MySQLRowConverter,完善类型转换逻辑。

2.2. 正解


package org.apache.flink.connector.jdbc.internal.converter;
。。。

/**
 * Runtime converter that responsible to convert between JDBC object and Flink internal object for
 * MySQL.
 */
public class MySQLRowConverter extends AbstractJdbcRowConverter 

    private static final long serialVersionUID = 1L;

    @Override
    public String converterName() 
        return "MySQL";
    

    public MySQLRowConverter(RowType rowType) 
        super(rowType);
    


    @Override
    protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) 
        return wrapIntoNullableInternalConverter(createInternalConverter(type));
    


    /**
     * 重新实现JdbcDeserializationConverter实例:添加处理 数据为null时的处理逻辑。
     *
     * @param jdbcDeserializationConverter
     * @return
     */
    @Override
    protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
            JdbcDeserializationConverter jdbcDeserializationConverter) 
        return val -> 
            if (val == null) 
                return null;
             else 
                return jdbcDeserializationConverter.deserialize(val);
            
        ;
    


    /**
     * 用于 jdbc 数据 转为 row type 的实例
     * <p>
     * 用于识别并处理mysql自己的数据类型
     *
     * @param type
     * @return
     */
    @Override
    protected JdbcDeserializationConverter createInternalConverter(LogicalType type) 
        switch (type.getTypeRoot()) 
            case NULL:
                return val -> null;
            case BOOLEAN:
            case FLOAT:
            case DOUBLE:
            case INTERVAL_YEAR_MONTH:
            case INTERVAL_DAY_TIME:
                return val -> val;
            case TINYINT:
                return val -> ((Integer) val).byteValue();
            case SMALLINT:
                // Converter for small type that casts value to int and then return short value,
                // since
                // JDBC 1.0 use int type for small values.
                return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
            case INTEGER:
                return val -> val;
            case BIGINT:
                return val -> val;
            case DECIMAL:
                final int precision = ((DecimalType) type).getPrecision();
                final int scale = ((DecimalType) type).getScale();
                // using decimal(20, 0) to support db type bigint unsigned, user should define
                // decimal(20, 0) in SQL,
                // but other precision like decimal(30, 0) can work too from lenient consideration.
                return val ->
                        val instanceof BigInteger
                                ? DecimalData.fromBigDecimal(
                                new BigDecimal((BigInteger) val, 0), precision, scale)
                                : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
            case DATE:
                return val -> (int) (((Date) val).toLocalDate().toEpochDay());
            case TIME_WITHOUT_TIME_ZONE:
                return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
            case TIMESTAMP_WITH_TIME_ZONE:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return new JdbcDeserializationConverter() 
                    @Override
                    public Object deserialize(Object jdbcField) throws SQLException 
                        if (jdbcField instanceof LocalDateTime) 
                            return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) jdbcField));
                        
                        return TimestampData.fromTimestamp((Timestamp) jdbcField);
                    
                ;

            case CHAR:
            case VARCHAR:
                return val -> StringData.fromString((String) val);
            case BINARY:
            case VARBINARY:
                return val -> (byte[]) val;
            case ARRAY:
            case ROW:
            case MAP:
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        
    

再次测试发现:数据消费成功了!

三、总结-bug自测收尾

类型转换并消费成功后,还需要考虑是否能再次将数据写入到数据库中,测试:

CREATE TABLE `test_gao_0519_02` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
   'username' = 'root',
   'password' = '11111111',
   'table-name' = 'auto_mysql22'
 );

CREATE TABLE `test_gao` (
   `birthday` TIMESTAMP
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/dataflow_test',
   'username' = 'root',
   'password' = '11111111',
   'table-name' = 'auto_mysql22'
 );
INSERT INTO `test_gao`
 (SELECT `birthday`
 FROM `test_gao_0519_02`);

观察能消费,并能写入到数据库:

到此问题解决。

以上是关于flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp的主要内容,如果未能解决你的问题,请参考以下文章

将通过 JDBC 检索到的 java.sql.Date 值转换为 java.time.LocalDate? [复制]

无法将 java.lang.String 类型的属性值转换为所需的 java.time.LocalDateTime 类型

未找到类型的即时转换器:java.time.ZonedDateTime

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink输出到JDBC