Flink-1.11开始提供了JDBC Sink
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink-1.11开始提供了JDBC Sink相关的知识,希望对你有一定的参考价值。
前言: Flink-1.10及以前,没有提供JDBC Sink, 使用自定义的Sink – 3.4、自定义Sink
Flink-1.11之后,官方connector提供了JDBC Sink
1、添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.3</version>
</dependency>
注意该连接器目前还 不是 二进制发行版的一部分,如何在集群中运行请参考 这里。
已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
2、通过JdbcSink.sink(...)
创建一个JDBC Sink
val jdbcSink = JdbcSink.sink(
"insert into person (id, name, age) values (?, ?, ?)",
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, u: (Int, String, Int)): Unit = {
ps.setInt(1, u._1)
ps.setString(2, u._2)
ps.setInt(3, u._3)
}
},
JdbcExecutionOptions.builder().withBatchSize(50).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false")
.withUsername("root")
.withPassword("123456").build()
)
3、完整代码
package com.chb.flink.connectors.ds
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.sql.PreparedStatement
// Flink-1.11开始提供了JDBC Sink
object JdbcConnectorDemo {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val jdbcSink = JdbcSink.sink(
"insert into person (id, name, age) values (?, ?, ?)",
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, u: (Int, String, Int)): Unit = {
ps.setInt(1, u._1)
ps.setString(2, u._2)
ps.setInt(3, u._3)
}
},
JdbcExecutionOptions.builder().withBatchSize(50).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false")
.withUsername("root")
.withPassword("123456").build()
)
streamEnv.fromElements((1, "chb", 23), (2, "ling", 18))
.addSink(jdbcSink)
streamEnv.execute("jdbc sink")
}
}
关注我的公众号【宝哥大数据】, 更多干货
以上是关于Flink-1.11开始提供了JDBC Sink的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.11 Unaligned Checkpoint 解析
Apache-Flink 1.11 无法在 SQL 函数 DDL 中使用 Python UDF
Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF