Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive

Posted wudl5566

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive相关的知识,希望对你有一定的参考价值。

1.架构图

2.实现实例

2.1 通过flink cdc 的两张表 合并 成一张视图, 同时写入到数据湖(hudi) 中 同时写入到kafka 中

2.2 实现思路

1.在flinksql 中创建flink cdc 表
2.创建视图(用两张表关联后需要的列的结果显示为一张速度)
3.创建输出表,关联Hudi表,并且自动同步到Hive表
4.查询视图数据,插入到输出表 -- flink  后台实时执行

2.3pom 文件需要的类

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>wudl-hudi</artifactId>
        <groupId>wudl-hudi</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink13.5-hudi</artifactId>


    <!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>

        <repository>
            <id>spring-plugin</id>
            <url>https://repo.spring.io/plugins-release/</url>
        </repository>

    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>$java.version</maven.compiler.source>
        <maven.compiler.target>$java.version</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.13.5</flink.version>
        <hadoop.version>2.7.3</hadoop.version>
        <mysql.version>8.0.16</mysql.version>
        <flink-mysql-cdc>2.0.2</flink-mysql-cdc>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>$flink-mysql-cdc</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>


        <!-- Flink Client -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <!-- Flink Table API & SQL -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink-bundle_$scala.binary.version</artifactId>
            <version>0.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <!-- MySQL/FastJson/lombok -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>$mysql.version</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <!-- slf4j及log4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.4 代码实现

package com.wudl.hudi.sink;

//import org.apache.flink.api.common.restartstrategy.RestartStrategies;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.streaming.api.CheckpointingMode;
//import org.apache.flink.streaming.api.environment.CheckpointConfig;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableResult;
//import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//
//import static org.apache.flink.table.api.Expressions.$;

import com.wudl.hudi.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author :wudl
 * @date :Created in 2022-02-19 22:18
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class MysqlJoinMysqlHuDi 
    public static void main(String[] args) throws Exception 
        // 1-获取表执行环境getExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // 1.1 开启CK
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //正常Cancel任务时,保留最后一次CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //状态后端
        env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/flink-hudi/ck"));
        //设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        // 2-创建输入表,TODO:从Kafka消费数据
        tableEnv.executeSql(
                "CREATE TABLE IF NOT EXISTS   source_mysql ( " +
                        "  id BIGINT  primary key NOT ENFORCED ," +
                        "  name string," +
                        "  age int ," +
                        "  birthday TIMESTAMP(3)," +
                        "  ts TIMESTAMP(3)" +
                        ") WITH ( " +
                        " 'connector' = 'mysql-cdc', " +
                        " 'hostname' = '192.168.1.162', " +
                        " 'port' = '3306', " +
                        " 'username' = 'root', " +
                        " 'password' = '123456', " +
                        " 'server-time-zone' = 'Asia/Shanghai', " +
                        " 'scan.startup.mode' = 'initial', " +
                        " 'database-name' = 'wudldb', " +
                        " 'table-name' = 'Flink_cdc' " +
                        " )");

        tableEnv.executeSql(
                "CREATE TABLE IF NOT EXISTS   source_mysql_Flink_cdd ( " +
                        "  id BIGINT  primary key NOT ENFORCED ," +
                        "  phone string," +
                        "  address string ," +
                        "  ts TIMESTAMP(3)" +
                        ") WITH ( " +
                        " 'connector' = 'mysql-cdc', " +
                        " 'hostname' = '192.168.1.162', " +
                        " 'port' = '3306', " +
                        " 'username' = 'root', " +
                        " 'password' = '123456', " +
                        " 'server-time-zone' = 'Asia/Shanghai', " +
                        " 'scan.startup.mode' = 'initial', " +
                        " 'database-name' = 'wudldb', " +
                        " 'table-name' = 'Flink_cdd' " +
                        " )");
        String joinSql = "SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id";
        Table tableMysqlJoin = tableEnv.sqlQuery(joinSql);


// 4-创建输出表,TODO: 关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息
        tableEnv.createTemporaryView("viewFlinkCdc",tableMysqlJoin);     
        tableEnv.executeSql(
                "CREATE TABLE myslqjoinmysqlhudiSink (" +
                        " id BIGINT PRIMARY KEY NOT ENFORCED," +
                        " name STRING," +
                        " age INT," +
                        " birthday STRING," +
                        " phone STRING," +
                        " address STRING," +
                        " ts STRING" +
                        ")" +
                        "WITH (" +
                        " 'connector' = 'hudi'," +
                        " 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
//                      " 'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/order_hudi_sink' ,\\n" +
                        " 'table.type' = 'MERGE_ON_READ'," +
                        " 'write.operation' = 'upsert'," +
                        " 'hoodie.datasource.write.recordkey.field'= 'id'," +
                        " 'write.precombine.field' = 'ts'," +
                        " 'write.tasks'= '1'" +
                        ")"
        );
        TableResult kafkaSink = tableEnv.executeSql(
                "CREATE TABLE flinkCdc_kafka_Sink (" +
                        "  id BIGINT NOT NULL," +
                        "  name STRING," +
                        "  age INT," +
                        "  birthday STRING," +
                        "  phone STRING," +
                        "  address STRING," +
                        "  ts STRING" +
                        ") WITH (" +
                        "  'connector' = 'kafka'," +
                        "  'topic' = 'sinktest'," +
                        "  'scan.startup.mode' = 'earliest-offset', "+
                        "  'properties.bootstrap.servers' = '192.168.1.161:6667'," +
                        "  'format' = 'debezium-json'," +
                        "    'debezium-json.ignore-parse-errors'='true' " +
                        ")"
        );

//        // 5-通过子查询方式,将数据写入输出表
        tableEnv.executeSql(
                "INSERT INTO myslqjoinmysqlhudiSink " +
                        "SELECT id,name,age,birthday,phone,address, ts FROM viewFlinkCdc"
        );


            tableEnv.sqlQuery("select * from flinkCdc_kafka_Sink").printSchema();
            tableEnv.sqlQuery("select * from viewFlinkCdc").printSchema();


        tableEnv.executeSql("insert into flinkCdc_kafka_Sink  SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id ");
      //  tableEnv.executeSql("insert into myslqjoinmysqlhudiSink  SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id ");
     //   tableEnv.executeSql("insert into flinkCdc_kafka_Sink  select id,name,age,CAST(birthday as STRING) birthday ,phone, address,CAST(ts AS STRING) ts  from myslqjoinmysqlhudiSink ");

//        tableEnv.executeSql("insert into flinkcdc_hudi_sink  select id,name,age,CAST(birthday as STRING) birthday,  CAST(ts as STRING) ts  from source_mysql ");
        System.out.println("--------------------------");


    


2.5 mysql 表结构

CREATE TABLE `Flink_cdc` (
  `id` bigint(64) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `age` int(20) DEFAULT NULL,
  `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7604 DEFAULT CHARSET=utf8mb4
#*********************************************************************************
CREATE TABLE `Flink_cdd` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `phone` varchar(20) DEFAULT NULL,
  `address` varchar(200) DEFAULT NULL,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7417 DEFAULT CHARSET=utf8mb4

2.6mysql 代码实现

package com.wudl.hudi.source;

import com.alibaba.fastjson.JSON;
import com.wudl.hudi.entity.FlinkCdcBean;
import com.wudl.hudi.entity.Order;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @author :wudl
 * @date :Created in 2022-02-19 14:24
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class GenerateMysqlFlinkCdcBean implements SourceFunction<FlinkCdcBean> 

    private boolean isRunning = true;
    String[] citys = "北京", "广东", "山东", "江苏", "河南", "上海", "河北", "浙江", "香港", "山西", "陕西", "湖南", "重庆", "福建", "天津", "云南", "四川", "广西", "安徽", "海南", "江西", "湖北", "山西", "辽宁", "内蒙古";
    Integer i = 0;
    List<Order> list = new ArrayList<>();

    @Override
    public void run(SourceContext<FlinkCdcBean> ctx) throws Exception 
        Random random = new Random();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        while (isRunning) 
            int number = random.nextInt(4) + 1;

            String name = getChineseName();

            String address = citys[random.nextInt(citys.length)];
            int age = random.nextInt(25);
            String birthday = getDate();
            String phone = getTel();
            java.sql.Timestamp ts = new java.sql.Timestamp(df.parse(getDate()).getTime());
            FlinkCdcBean flinkCdcBean = new FlinkCdcBean(name, age, birthday, ts, phone, address);
            ctx.collect(flinkCdcBean);

        
    

    /**
     * 获取当前时间
     *
     * @return
     */
    public static String getDate() throws InterruptedException 
        Calendar calendar = Calendar.getInstance();
        Date date = calendar.getTime();
        String dataStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(date);
        Thread.sleep(10);
        return dataStr;
    

    public static int getNum(int start, int end) 
        return (int) (Math.random() * (end - start + 1) + start);
    

    private static String[] telFirst = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");

    /**
     * 获取手机号码
     *
     * @return
     */
    private static String getTel() 
        int index = getNum(0, telFirst.length - 1);
        String first = telFirst[index];
        String second = String.valueOf(getNum(1, 888) + 10000).substring(1);
        String third = String.valueOf(getNum(1, 9100) + 10000).substring(1);
        return first + second + third;
    

    @Override
    public void cancel() 
        isRunning = false;
    


    private static String 

以上是关于Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive的主要内容,如果未能解决你的问题,请参考以下文章

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi

实时数据湖 Flink Hudi 实践探索

Apache Hudi 在 B 站构建实时数据湖的实践

Apache Hudi 在 B 站构建实时数据湖的实践

2天,撸完Flink+Hudi+Iceberg数据湖落地系统,爽!

2天,撸完Flink+Hudi+Iceberg数据湖落地系统,爽!