FLINK JDBC SQL Connector遇到的类型转换问题

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK JDBC SQL Connector遇到的类型转换问题相关的知识,希望对你有一定的参考价值。

背景

Flink 1.3
最近在写Flink Sql的时候,遇到了java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long 问题

分析

直接上报错的sql,如下:

CREATE TABLE `xxx` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
  
);

具体的问题堆栈如下:

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
	at JoinTableFuncCollector$9.collect(Unknown Source)
	at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
	at LookupFunction$4$TableFunctionResultConverterCollector$2.collect(Unknown Source)
	at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
	at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:175)
	at LookupFunction$4.flatMap(Unknown Source)
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at StreamExecCalc$67.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
	at org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
	at StreamExecCorrelate$27$TableFunctionCollector$20.collect(Unknown Source)
	at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
	at StreamExecCorrelate$27$TableFunctionResultConverterCollector$25.collect(Unknown Source)
	at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
	at xxx.xxx.xxxFunction.eval(BinlogParserFunction.java:56)
	at StreamExecCorrelate$27.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at StreamExecCalc$17.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:212)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:154)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

其实这是mysql和flink SQL在做字段类型映射的时候,会出现的类型匹配问题:
查看一下对应mysql字段如下:

+-----------------------+---------------------+------+-----+---------+----------------+
| Field                 | Type                | Null | Key | Default | Extra          |
+-----------------------+---------------------+------+-----+---------+----------------+
| merchantId            | int(11) unsigned    | NO   | PRI | NULL    | auto_increment |
| userId                | bigint(20)          | NO   | MUL | NULL    |                |
| status                | tinyint(1) unsigned | NO   |     | 1       |                |

再参考Flink Data Type Mapping,如下:

MySQL typeFlink SQL type
INT UNSIGNEDBIGINT
BIGINTBIGINT
TINYINTTINYINT
INTINT

注意: 对于MYSQL类型是INT UNSIGNED 的字段,映射到FLINk应该为BIGINT类型,而不是INT类型

否则会报如下错误:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer

解决

把对一个的类型修改一下可以了:

CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
  
);
           ||
           \\/
CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  INT
) WITH (
  
);

以上是关于FLINK JDBC SQL Connector遇到的类型转换问题的主要内容,如果未能解决你的问题,请参考以下文章

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink: FlieSystem SQL Connector

Flink SQL Print Connector