大数据ClickHouse(十九):Flink 写入 ClickHouse API
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据ClickHouse(十九):Flink 写入 ClickHouse API相关的知识,希望对你有一定的参考价值。
文章目录
一、Flink 1.10.x之前版本使用flink-jdbc,只支持Table API
二、Flink 1.11.x之后版本使用flink-connector-jdbc,只支持DataStream API
Flink 写入 ClickHouse API
可以通过Flink原生JDBC Connector包将Flink结果写入ClickHouse中,Flink在1.11.0版本对其JDBC Connnector进行了重构:
- 重构之前(1.10.x 及之前版本),包名为 flink-jdbc 。
- 重构之后(1.11.x 及之后版本),包名为 flink-connector-jdbc 。
二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:
API名称 | flink-jdbc | flink-connector-jdbc |
DataStream | 不支持 | 支持 |
Table API | 支持 | 不支持 |
一、Flink 1.10.x之前版本使用flink-jdbc,只支持Table API
- 示例
1、maven中需要导入以下包
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
2、代码
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到ClickHouse中,只支持Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1
def main(args: Array[String]): Unit =
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为1,后期每个并行度满批次需要的条数时,会插入click中
env.setParallelism(1)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取Socket中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line =>
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
)
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将table 对象写入ClickHouse中
//需要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认5000条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册ClickHouse table Sink,设置sink 数据的字段及Schema信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
二、Flink 1.11.x之后版本使用flink-connector-jdbc,只支持DataStream API
- 示例
1、在Maven中导入以下依赖包
<!-- Flink1.11 后需要 Flink-client包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
2、代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入ClickHouse ,目前只支持DataStream API
*/
object FlinkWriteToClickHouse2
def main(args: Array[String]): Unit =
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line =>
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
)
//准备向ClickHouse中插入数据的sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据SQL
insetIntoCkSql,
//设置插入ClickHouse数据的参数
new JdbcStatementBuilder[(Int, String, Int)]
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit =
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
,
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接ClickHouse的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
以上是关于大数据ClickHouse(十九):Flink 写入 ClickHouse API的主要内容,如果未能解决你的问题,请参考以下文章
vip视频教程 IT自学视频教程 编程教程Flink+ClickHouse 玩转企业级实时大数据开发
2021年大数据Flink(三十九):Table与SQL 总结 Flink-SQL常用算子
ElasticSearch实战(四十九)-Flink 大数据实时同步方案
ElasticSearch实战(四十九)-Flink 大数据实时同步方案