Flink SQL --JDBC connector

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL --JDBC connector相关的知识,希望对你有一定的参考价值。


基于 Flink-1.12

一、Table API & SQL

二、SQL Client

2.1、配置

1、添加依赖jar

flink-connector-jdbc_2.11-1.12.3.jar
mysql-connector-java-5.1.48.jar

2、重启flink

stop-cluster.sh 
start-cluster.sh

注意: 如果不重启的话,无法加载到上面的依赖jar
java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
或者

Flink SQL> select * from t_sum;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print

3、启动SQL Client

./sql-client.sh embedded

2.2、创建表

2.2.1、MySQL中创建表

create table t_sum(
 cnt int
);

2.2.2、Flink SQL Client创建表


create table t_sum(
 cnt int
)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://chb1:3306/flinktest',
'table-name' = 't_sum',
'username' = 'root',
'password' = '123456'
);

2.3、测试

2.3.1、在Flink SQL Client插入数据


Flink SQL> insert into t_sum values (10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 7cc4b890a7cb551dd31d38d0d4613c60

Flink SQL> 

查看MySQL中是否插入数据

2.3.1、在MySQL插入数据

mysql> insert into t_sum value(20);
Query OK, 1 row affected (0.00 sec)

mysql> 

Flink SQL Client中也更新了

2.4、由于使用的是默认catalog,没有持久存储,所以 Flink SQL Client退出重新登录,数据没了

参考

JDBC SQL Connector

关注我的公众号【宝哥大数据】, 更多干货。。。

以上是关于Flink SQL --JDBC connector的主要内容,如果未能解决你的问题,请参考以下文章

jmeter 调用mysql数据库,使用JDBC请求执行相关SQL

mybatis框架学习-前置,复习Jdbc

Flink输出到JDBC

flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp

Flink SQL DDL

Flink 1.17 Flink-SQL-Gateway HiveServer2 源码分析