FLINK JDBC SQL Connector遇到的类型转换问题
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK JDBC SQL Connector遇到的类型转换问题相关的知识,希望对你有一定的参考价值。
背景
Flink 1.3
最近在写Flink Sql的时候,遇到了java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
问题
分析
直接上报错的sql,如下:
CREATE TABLE `xxx` (
`merchantId` BIGINT,
`userId` BIGINT,
`status` BIGINT
) WITH (
);
具体的问题堆栈如下:
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
at JoinTableFuncCollector$9.collect(Unknown Source)
at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
at LookupFunction$4$TableFunctionResultConverterCollector$2.collect(Unknown Source)
at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:175)
at LookupFunction$4.flatMap(Unknown Source)
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$67.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
at org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
at StreamExecCorrelate$27$TableFunctionCollector$20.collect(Unknown Source)
at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
at StreamExecCorrelate$27$TableFunctionResultConverterCollector$25.collect(Unknown Source)
at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
at xxx.xxx.xxxFunction.eval(BinlogParserFunction.java:56)
at StreamExecCorrelate$27.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$17.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:212)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
其实这是mysql和flink SQL在做字段类型映射的时候,会出现的类型匹配问题:
查看一下对应mysql字段如下:
+-----------------------+---------------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+---------------------+------+-----+---------+----------------+
| merchantId | int(11) unsigned | NO | PRI | NULL | auto_increment |
| userId | bigint(20) | NO | MUL | NULL | |
| status | tinyint(1) unsigned | NO | | 1 | |
再参考Flink Data Type Mapping,如下:
MySQL type | Flink SQL type |
---|---|
INT UNSIGNED | BIGINT |
BIGINT | BIGINT |
TINYINT | TINYINT |
INT | INT |
注意: 对于MYSQL类型是INT UNSIGNED 的字段,映射到FLINk应该为BIGINT类型,而不是INT类型
否则会报如下错误:
java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
解决
把对一个的类型修改一下可以了:
CREATE TABLE `dim_merchant` (
`merchantId` BIGINT,
`userId` BIGINT,
`status` BIGINT
) WITH (
);
||
\\/
CREATE TABLE `dim_merchant` (
`merchantId` BIGINT,
`userId` BIGINT,
`status` INT
) WITH (
);
以上是关于FLINK JDBC SQL Connector遇到的类型转换问题的主要内容,如果未能解决你的问题,请参考以下文章
Flink JDBC Connector:Flink 与数据库集成最佳实践
Flink JDBC Connector:Flink 与数据库集成最佳实践
Flink JDBC Connector:Flink 与数据库集成最佳实践
Flink JDBC Connector:Flink 与数据库集成最佳实践