FLINK 读取MYSQL数据-通过JDBC方式
Posted 又到花开时
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 读取MYSQL数据-通过JDBC方式相关的知识,希望对你有一定的参考价值。
package quickstart.batch;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
public class ReadFrommysql {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
//配置下相关信息,以及取数逻辑
DataSource<Row> dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.65.181/mbi_dev")
.setUsername("MobileBI")
.setPassword("ei2musv+@{U4")
.setQuery("select '1','a' from dual")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish());
//转化输出
Table table1 = tableEnv.fromDataSet(dataSource);
DataSet<Row> rowDataSet = tableEnv.toDataSet(table1, new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
rowDataSet.print();
// result.output(JDBCOutputFormat.buildJDBCOutputFormat()
// .setDrivername("com.mysql.jdbc.Driver")
// .setDBUrl("jdbc:mysql://localhost:3306")
// .setUsername("root")
// .setPassword("root")
// .setQuery("insert into flink.test (id,type) values (?,?)")
// .setSqlTypes(new int[]{Types.INTEGER, Types.NCHAR})
// .finish());
//
// env.execute("flink-test");
}
}
以上是关于FLINK 读取MYSQL数据-通过JDBC方式的主要内容,如果未能解决你的问题,请参考以下文章
Flink CookBook- JDBC Table Source并发详解
flink JDBC 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp