FLINK 读取MYSQL数据-通过JDBC方式

Posted 又到花开时

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 读取MYSQL数据-通过JDBC方式相关的知识,希望对你有一定的参考价值。

package quickstart.batch;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class ReadFrommysql {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
//配置下相关信息,以及取数逻辑
DataSource<Row> dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.65.181/mbi_dev")
.setUsername("MobileBI")
.setPassword("ei2musv+@{U4")
.setQuery("select '1','a' from dual")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish());

  //转化输出
Table table1 = tableEnv.fromDataSet(dataSource);
DataSet<Row> rowDataSet = tableEnv.toDataSet(table1, new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
rowDataSet.print();

// result.output(JDBCOutputFormat.buildJDBCOutputFormat()
// .setDrivername("com.mysql.jdbc.Driver")
// .setDBUrl("jdbc:mysql://localhost:3306")
// .setUsername("root")
// .setPassword("root")
// .setQuery("insert into flink.test (id,type) values (?,?)")
// .setSqlTypes(new int[]{Types.INTEGER, Types.NCHAR})
// .finish());
//
// env.execute("flink-test");

}
}

以上是关于FLINK 读取MYSQL数据-通过JDBC方式的主要内容,如果未能解决你的问题,请参考以下文章

Flink CDC 读取MySQL的数据

Flink CookBook- JDBC Table Source并发详解

java(JDBC)用获取文件的方式连接数据库错误?

后端开发JDBC获取数据库连接的方式

后端开发JDBC获取数据库连接的方式

flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp