Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive
Posted wudl5566
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive相关的知识,希望对你有一定的参考价值。
1.架构图
2.实现实例
2.1 通过flink cdc 的两张表 合并 成一张视图, 同时写入到数据湖(hudi) 中 同时写入到kafka 中
2.2 实现思路
1.在flinksql 中创建flink cdc 表
2.创建视图(用两张表关联后需要的列的结果显示为一张速度)
3.创建输出表,关联Hudi表,并且自动同步到Hive表
4.查询视图数据,插入到输出表 -- flink 后台实时执行
2.3pom 文件需要的类
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>wudl-hudi</artifactId>
<groupId>wudl-hudi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink13.5-hudi</artifactId>
<!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>spring-plugin</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>$java.version</maven.compiler.source>
<maven.compiler.target>$java.version</maven.compiler.target>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.13.5</flink.version>
<hadoop.version>2.7.3</hadoop.version>
<mysql.version>8.0.16</mysql.version>
<flink-mysql-cdc>2.0.2</flink-mysql-cdc>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>$flink-mysql-cdc</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- Flink Client -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- Flink Table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_$scala.binary.version</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<!-- MySQL/FastJson/lombok -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>$mysql.version</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- slf4j及log4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2.4 代码实现
package com.wudl.hudi.sink;
//import org.apache.flink.api.common.restartstrategy.RestartStrategies;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.streaming.api.CheckpointingMode;
//import org.apache.flink.streaming.api.environment.CheckpointConfig;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableResult;
//import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//
//import static org.apache.flink.table.api.Expressions.$;
import com.wudl.hudi.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
* @author :wudl
* @date :Created in 2022-02-19 22:18
* @description:
* @modified By:
* @version: 1.0
*/
public class MysqlJoinMysqlHuDi
public static void main(String[] args) throws Exception
// 1-获取表执行环境getExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.1 开启CK
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//正常Cancel任务时,保留最后一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//状态后端
env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/flink-hudi/ck"));
//设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "root");
// 2-创建输入表,TODO:从Kafka消费数据
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS source_mysql ( " +
" id BIGINT primary key NOT ENFORCED ," +
" name string," +
" age int ," +
" birthday TIMESTAMP(3)," +
" ts TIMESTAMP(3)" +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.1.162', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'server-time-zone' = 'Asia/Shanghai', " +
" 'scan.startup.mode' = 'initial', " +
" 'database-name' = 'wudldb', " +
" 'table-name' = 'Flink_cdc' " +
" )");
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS source_mysql_Flink_cdd ( " +
" id BIGINT primary key NOT ENFORCED ," +
" phone string," +
" address string ," +
" ts TIMESTAMP(3)" +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.1.162', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'server-time-zone' = 'Asia/Shanghai', " +
" 'scan.startup.mode' = 'initial', " +
" 'database-name' = 'wudldb', " +
" 'table-name' = 'Flink_cdd' " +
" )");
String joinSql = "SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id";
Table tableMysqlJoin = tableEnv.sqlQuery(joinSql);
// 4-创建输出表,TODO: 关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息
tableEnv.createTemporaryView("viewFlinkCdc",tableMysqlJoin);
tableEnv.executeSql(
"CREATE TABLE myslqjoinmysqlhudiSink (" +
" id BIGINT PRIMARY KEY NOT ENFORCED," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
")" +
"WITH (" +
" 'connector' = 'hudi'," +
" 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
// " 'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/order_hudi_sink' ,\\n" +
" 'table.type' = 'MERGE_ON_READ'," +
" 'write.operation' = 'upsert'," +
" 'hoodie.datasource.write.recordkey.field'= 'id'," +
" 'write.precombine.field' = 'ts'," +
" 'write.tasks'= '1'" +
")"
);
TableResult kafkaSink = tableEnv.executeSql(
"CREATE TABLE flinkCdc_kafka_Sink (" +
" id BIGINT NOT NULL," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'sinktest'," +
" 'scan.startup.mode' = 'earliest-offset', "+
" 'properties.bootstrap.servers' = '192.168.1.161:6667'," +
" 'format' = 'debezium-json'," +
" 'debezium-json.ignore-parse-errors'='true' " +
")"
);
// // 5-通过子查询方式,将数据写入输出表
tableEnv.executeSql(
"INSERT INTO myslqjoinmysqlhudiSink " +
"SELECT id,name,age,birthday,phone,address, ts FROM viewFlinkCdc"
);
tableEnv.sqlQuery("select * from flinkCdc_kafka_Sink").printSchema();
tableEnv.sqlQuery("select * from viewFlinkCdc").printSchema();
tableEnv.executeSql("insert into flinkCdc_kafka_Sink SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id ");
// tableEnv.executeSql("insert into myslqjoinmysqlhudiSink SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id ");
// tableEnv.executeSql("insert into flinkCdc_kafka_Sink select id,name,age,CAST(birthday as STRING) birthday ,phone, address,CAST(ts AS STRING) ts from myslqjoinmysqlhudiSink ");
// tableEnv.executeSql("insert into flinkcdc_hudi_sink select id,name,age,CAST(birthday as STRING) birthday, CAST(ts as STRING) ts from source_mysql ");
System.out.println("--------------------------");
2.5 mysql 表结构
CREATE TABLE `Flink_cdc` (
`id` bigint(64) NOT NULL AUTO_INCREMENT,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7604 DEFAULT CHARSET=utf8mb4
#*********************************************************************************
CREATE TABLE `Flink_cdd` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`phone` varchar(20) DEFAULT NULL,
`address` varchar(200) DEFAULT NULL,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7417 DEFAULT CHARSET=utf8mb4
2.6mysql 代码实现
package com.wudl.hudi.source;
import com.alibaba.fastjson.JSON;
import com.wudl.hudi.entity.FlinkCdcBean;
import com.wudl.hudi.entity.Order;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author :wudl
* @date :Created in 2022-02-19 14:24
* @description:
* @modified By:
* @version: 1.0
*/
public class GenerateMysqlFlinkCdcBean implements SourceFunction<FlinkCdcBean>
private boolean isRunning = true;
String[] citys = "北京", "广东", "山东", "江苏", "河南", "上海", "河北", "浙江", "香港", "山西", "陕西", "湖南", "重庆", "福建", "天津", "云南", "四川", "广西", "安徽", "海南", "江西", "湖北", "山西", "辽宁", "内蒙古";
Integer i = 0;
List<Order> list = new ArrayList<>();
@Override
public void run(SourceContext<FlinkCdcBean> ctx) throws Exception
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning)
int number = random.nextInt(4) + 1;
String name = getChineseName();
String address = citys[random.nextInt(citys.length)];
int age = random.nextInt(25);
String birthday = getDate();
String phone = getTel();
java.sql.Timestamp ts = new java.sql.Timestamp(df.parse(getDate()).getTime());
FlinkCdcBean flinkCdcBean = new FlinkCdcBean(name, age, birthday, ts, phone, address);
ctx.collect(flinkCdcBean);
/**
* 获取当前时间
*
* @return
*/
public static String getDate() throws InterruptedException
Calendar calendar = Calendar.getInstance();
Date date = calendar.getTime();
String dataStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(date);
Thread.sleep(10);
return dataStr;
public static int getNum(int start, int end)
return (int) (Math.random() * (end - start + 1) + start);
private static String[] telFirst = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
/**
* 获取手机号码
*
* @return
*/
private static String getTel()
int index = getNum(0, telFirst.length - 1);
String first = telFirst[index];
String second = String.valueOf(getNum(1, 888) + 10000).substring(1);
String third = String.valueOf(getNum(1, 9100) + 10000).substring(1);
return first + second + third;
@Override
public void cancel()
isRunning = false;
private static String 以上是关于Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive的主要内容,如果未能解决你的问题,请参考以下文章
Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi