Huid学习七:Hudi与Flink集成

Posted NC_NE

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Huid学习七:Hudi与Flink集成相关的知识,希望对你有一定的参考价值。

一、环境准备

  1、基础框架版本

框架

版本

备注

Apache Hadoop

2.7.5

Apache Hudi

0.9.0

Apache Flink

1.12

基于Scala 2.12

  2、执行环境安装部署

    2.1、Flink 框架概述

        Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算

   2.2、安装部署

        此文档使用的Flink 1.12版本,部署Flink Standalone集群模式

        1)、下载安装包 ,官网地址

         2)、上传软件包到指定目录解压

tar -zxvf flink-1.12.2-bin-scala_2.12.tgz -C /opt/module/

        3)、启动Flink本地集群

bin/start-cluster.sh

        4)、访问Flink的Web UI

http://hadoop100:8081/#/overview

         5)、执行官方示例, 可以看到flink任务运行成功,至此flink的安装部署完成。

bin/flink run examples/batch/WordCount.jar

 

    2.3、添加Hudi依赖

        1)、添加Hudi依赖jar包

        从编译的Hudi目录下/hudi-0.9.0/packaging/hudi-flink-bundle/target/ 将 hudi-flink-bundle_2.12-0.9.0.jar,放入Flink安装的lib目录下即可

 

    2.4、修改hadoop的环境变量

vim /etc/profile.d/env.sh (根据自己的机器来修改环境变量文件)

##HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-2.7.2
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath

    2.5、启动集群

        1)、设置hadoop环境变量,很重要

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

        2)、启动Flink

bin/start-cluster.sh

        3)、启动Flink SQL Cli命令行

bin/sql-client.sh embedded shell

         4)、设置返回结果模式为tableau,让结果直接显示,设置处理模式为批处理

set execution.result-mode = tableau;
set execution.type = batch;

二、Flink SQL Client 操作Hudi

  1、创建MOR表

        (如果不指定table.type默认为copy on write表)

CREATE TABLE mor_test1( 
    uuid        VARCHAR(20), 
    name        VARCHAR(10), 
    age         INT, 
    ts          TIMESTAMP(3), 
    `partition` VARCHAR(20)
)PARTITIONED BY (`partition`)
WITH ( 
'connector' = 'hudi', 
'path' = 'hdfs://hadoop100/datas/hudi-warehouse/Flink-Hudi/mor_test1', 
'table.type' = 'MERGE_ON_READ'
);

 

  2、插入数据

    2.1、插入多条数据

INSERT INTO  VALUES 
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), 
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), 
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), 
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), 
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

    2.2、成功插入数据,查询数据

        1)、sql查询

         2)、UI界面上可以看到任务的执行情况

         3)、hdfs上查看Hudi表数据

  3、修改数据

        flink sql操作hudi时很多参数不像spark需要一一设置,但是都有default默认值,插入数据时没有像spark一样指定唯一键,其实flink表中是有唯一键的默认值为uuid

    3.1、修改uuid为id1的数据

        将原来年龄23岁改为27岁(flink sql除非第一次建表插入数据模式为overwrite,后续操作都是默认为append模式,可以直接触发修改操作

 insert into mor_test1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

三、流式查询(Streaming query)

        流式读取没法在一个FlinkSQL Client内完成,需要两个客户端,一个客户端负责往表里面写数据,另一个客户端实时读数据

  1、另起一客户端,创建表mor_test2

        设置相关属性,以流的方式查询读取,映射到前面表:mor_test1,就是存储路径和mor_test1相同,及实时读取mor_test1的数据

CREATE TABLE mor_test2( 
    uuid        VARCHAR(20), 
    name        VARCHAR(10), 
    age         INT, 
    ts          TIMESTAMP(3), 
    `partition` VARCHAR(20)
)PARTITIONED BY (`partition`)
WITH ( 
'connector' = 'hudi', 
'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/mor_test1', 
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '2022070323131133',
'read.streaming.check-interval' = '4'
);

        参数说明

read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;

  2、查询mor_test2表报错

         1)、设置执行模式为流,进行查询

set execution.type=streaming;

        2)、流式查询成功

  3、使用代码来流式查询

public static void main(String[] args) throws Exception 
    //创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings setting = EnvironmentSettings.newInstance()
            .inStreamingMode() //设置流式模式
            .build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);

    //创建表
    tableEnv.executeSql("" +
            "CREATE TABLE mor_test2( \\n" +
            "    uuid        VARCHAR(20), \\n" +
            "    name        VARCHAR(10), \\n" +
            "    age         INT, \\n" +
            "    ts          TIMESTAMP(3), \\n" +
            "    `partition` VARCHAR(20)\\n" +
            ")PARTITIONED BY (`partition`)\\n" +
            "WITH ( \\n" +
            "'connector' = 'hudi', \\n" +
            "'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/mor_test1', \\n" +
            "'table.type' = 'MERGE_ON_READ',\\n" +
            "'read.streaming.enabled' = 'true',\\n" +
            "'read.streaming.start-commit' = '2022070323131133',\\n" +
            "'read.streaming.check-interval' = '4'\\n" +
            ")");
    tableEnv.executeSql("select *  from mor_test2").print();

        一直在等待数据的写入

四、Flink SQL实时写Hudi

  1、整体架构

        将之前文章使用StructuredStreaming流式程序改为Flink SQL程序,实时从Kafka消费Topic数据,解析转换后,存储至Hudi表中,示意图如下所示

  2、消费Kafka数据

    2.1、创建kafka的topic (hudi_flink)

bin/kafka-topics.sh --zookeeper hadoop100:2181 --create --replication-factor 1 --partitions 1 --topic hudi_flink

    2.2、Flink SQL Client消费kafka数据

        1)、创建kafka表

CREATE TABLE hudi_flink_kafka_source (
  orderId STRING,
  userId STRING,
  orderTime STRING,
  ip STRING,
  orderMoney DOUBLE,
  orderStatus INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'hudi_flink',
  'properties.bootstrap.servers' = 'hadoop100:9092',
  'properties.group.id' = 'gid-1001',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

         2)查询表hudi_kafka_source

        记得添加flink sql连接kafka的jar包,对应自己的kafka版本,官网下载

         我们往kafka的topic插入数据,可以看到Flink sql快速消费数据

"orderId": "20211122103434136000001","userId": "300000971","orderTime": "2021-11-22 10:34:34.136","ip": "123.232.118.98","orderMoney": 485.48,"orderStatus": 0

  3、消费Kafka数据写入到Hudi表

    3.1、创建Hudi表

CREATE TABLE hudi_flink_kafka_sink (
  orderId STRING PRIMARY KEY NOT ENFORCED,
  userId STRING,
  orderTime STRING,
  ip STRING,
  orderMoney DOUBLE,
  orderStatus INT,
  ts STRING,
  partition_day STRING
)
PARTITIONED BY (partition_day) 
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/hudi_flink_kafka_sink',
  'table.type' = 'MERGE_ON_READ',
  'write.operation' = 'upsert',
  'hoodie.datasource.write.recordkey.field'= 'orderId',
  'write.precombine.field' = 'ts',
  'write.tasks'= '1',
  'compaction.tasks' = '1', 
  'compaction.async.enabled' = 'true', 
  'compaction.trigger.strategy' = 'num_commits', 
  'compaction.delta_commits' = '1'
);

    3.2、写入数据到Hudi

        1)先读取hudi_flink_kafka_source 表数据,再写入hudi_flink_kafka_sink 表里面

                为什么需要对hudi_flink_kafka_source 的数据做个处理,因为我们存入Hudi需要配置三个字段,主键字段(默认UUID)、数据合并字段时间戳,默认名称ts、分区字段,建hudi表的时候已经指定了主键字段('hoodie.datasource.write.recordkey.field'= 'orderId'),数据合并字段('write.precombine.field' = 'ts')

INSERT INTO hudi_flink_kafka_sink 
SELECT
  orderId, userId, orderTime, ip, orderMoney, orderStatus,
  substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day 
FROM hudi_flink_kafka_source ;

        2)查询hudi_flink_kafka_sink 表数据

         3)、查询hdfs文件

    3.3、代码实现消费Kafka数据写入到Hudi表

        1)模拟kafka生产者

/**
  * @author oyl
  * @create 2022-07-10 10:44
  * @Description 模拟kafka生产者产生订单数据
  */
object MockOrderProducer 

  def main(args: Array[String]): Unit = 

    // 1. Kafka Client Producer 配置信息
    val pro: Properties = new Properties()
    pro.put("bootstrap.servers","hadoop100:9092,hadoop101:9092")
    pro.put("acks","1")
    pro.put("retries","3")
    pro.put("key.serializer",classOf[StringSerializer].getName)
    pro.put("value.serializer",classOf[StringSerializer].getName)

    //创建kafkaProducer对象,传入配置信息

    val myProducer: KafkaProducer[String, String] = new KafkaProducer[String,String](pro)

    //随机数实例对象
    val random: Random = new Random()

    //订单状态
    val allStatus = Array(0,1,2,0,0,0,0,0,0,0,3,0)

    //控制数据条数
    val batchNumber: Int = random.nextInt(1)+5

    try 
      (1 to batchNumber).foreach  number =>
        val currentTime: Long = System.currentTimeMillis()
        val orderId: String = s"$getDate(currentTime)%06d".format(number)
        val userId: String = s"$1 + random.nextInt(5)%08d".format(random.nextInt(1000))
        val orderTime: String = getDate(currentTime, format = "yyyy-MM-dd HH:mm:ss.SSS")
        val orderMoney: String = s"$5 + random.nextInt(500).%02d".format(random.nextInt(100))
        val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
        // 3. 订单记录数据
        val orderRecord: OrderRecord = OrderRecord(
          orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
        )
        // 转换为JSON格式数据
        val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
        println(orderJson)
        // 4. 构建ProducerRecord对象
        val record = new ProducerRecord[String, String]("hudi_flink", orderId, orderJson)
        // 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
        myProducer.send(record)
      
     catch 
      case e: Exception => e.printStackTrace()
     finally 
      if (null != myProducer) myProducer.close()
    
  

  /** =================获取当前时间================= */
  def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = 
    val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
    val formatDate: String = fastFormat.format(time) // 格式化日期
    formatDate
  

  /** ================= 获取随机IP地址 ================= */
  def getRandomIp: String = 
    // ip范围
    val range: Array[(Int, Int)] = Array(
      (607649792, 608174079), //36.56.0.0-36.63.255.255
      (1038614528, 1039007743), //61.232.0.0-61.237.255.255
      (1783627776, 1784676351), //106.80.0.0-106.95.255.255
      (2035023872, 2035154943), //121.76.0.0-121.77.255.255
      (2078801920, 2079064063), //123.232.0.0-123.235.255.255
      (-1950089216, -1948778497), //139.196.0.0-139.215.255.255
      (-1425539072, -1425014785), //171.8.0.0-171.15.255.255
      (-1236271104, -1235419137), //182.80.0.0-182.92.255.255
      (-770113536, -768606209), //210.25.0.0-210.47.255.255
      (-569376768, -564133889) //222.16.0.0-222.95.255.255
    )
    // 随机数:IP地址范围下标
    val random = new Random()
    val index = random.nextInt(10)
    val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)

    // 转换Int类型IP地址为IPv4格式
    number2IpString(ipNumber)
  

  /** =================将Int类型IPv4地址转换为字符串类型================= */
  def number2IpString(ip: Int): String = 
    val buffer: Array[Int] = new Array[Int](4)
    buffer(0) = (ip >> 24) & 0xff
    buffer(1) = (ip >> 16) & 0xff
    buffer(2) = (ip >> 8) & 0xff
    buffer(3) = ip & 0xff
    // 返回IPv4地址
    buffer.mkString(".")
  

         样例类

case class OrderRecord (
                         orderId:String,
                        userId:String,
                         orderTime:String,
                         ip:String,
                         orderMoney:Double,
                          orderStatus:Int
                       )

         2)、消费kafka数据并写入Hudi表

/**
 * @author oyl
 * @create 2022-07-10 11:31
 * @Description : FlinkSQL读取kafka数据写入Hudi
 */
public class FlinkSQLReadKafkaToHudi 

    public static void main(String[] args) 

        //1、设置执行环境,使用流式API

        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置CheckPoint
        streamEnv.setParallelism(1)
                .enableCheckpointing(5000);

        EnvironmentSettings setting = EnvironmentSettings
                .newInstance()
                .inStreamingMode()  //设置流式模式
                .build();

        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, setting);

        // 2-创建输入表,从Kafka消费数据
        streamTableEnv.executeSql("CREATE TABLE hudi_flink_kafka_source (\\n" +
                "  orderId STRING,\\n" +
                "  userId STRING,\\n" +
                "  orderTime STRING,\\n" +
                "  ip STRING,\\n" +
                "  orderMoney DOUBLE,\\n" +
                "  orderStatus INT\\n" +
                ") WITH (\\n" +
                "  'connector' = 'kafka',\\n" +
                "  'topic' = 'hudi_flink',\\n" +
                "  'properties.bootstrap.servers' = 'hadoop100:9092',\\n" +
                "  'properties.group.id' = 'gid-1009',\\n" +
                "  'scan.startup.mode' = 'latest-offset',\\n" +
                "  'format' = 'json',\\n" +
                "  'json.fail-on-missing-field' = 'false',\\n" +
                "  'json.ignore-parse-errors' = 'true'\\n" +
                ")");
        // 3-转换数据
        Table etltable = streamTableEnv.from("hudi_flink_kafka_source")
                .addColumns($("orderId").substring(0, 17).as("ts"))
                .addColumns($("orderTime").substring(0, 10).as("partition_day"));

        streamTableEnv.createTemporaryView("kafka_table",etltable);

        //streamTableEnv.executeSql("select * from kafka_table").print();

        // 4-创建输出表,关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息
        streamTableEnv.executeSql("CREATE TABLE hudi_flink_kafka_sink (\\n" +
                "  orderId STRING PRIMARY KEY NOT ENFORCED,\\n" +
                "  userId STRING,\\n" +
                "  orderTime STRING,\\n" +
                "  ip STRING,\\n" +
                "  orderMoney DOUBLE,\\n" +
                "  orderStatus INT,\\n" +
                "  ts STRING,\\n" +
                "  partition_day STRING\\n" +
                ")\\n" +
                "PARTITIONED BY (partition_day) \\n" +
                "WITH (\\n" +
                "  'connector' = 'hudi',\\n" +
                "  'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/hudi_flink_kafka_sink',\\n" +
                "  'table.type' = 'MERGE_ON_READ',\\n" +
                "  'write.operation' = 'upsert',\\n" +
                "  'hoodie.datasource.write.recordkey.field'= 'orderId',\\n" +
                "  'write.precombine.field' = 'ts',\\n" +
                "  'write.tasks'= '1',\\n" +
                "  'compaction.tasks' = '1', \\n" +
                "  'compaction.async.enabled' = 'true', \\n" +
                "  'compaction.trigger.strategy' = 'num_commits', \\n" +
                "  'compaction.delta_commits' = '1'\\n" +
                ")"
        );
        // 5-通过子查询方式,将数据写入输出表
        streamTableEnv.executeSql(
                "INSERT INTO hudi_flink_kafka_sink " +
                        "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM kafka_table"
        );
    

        3)读取Hudi表数据

/**
 * @author oyl
 * @create 2022-07-10 14:34
 * @Description 使用Flink SQL读取Hudi表数据
 */
public class FlinkSQLReadHudi 

    public static void main(String[] args) 
        // 1-获取表执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings) ;

        // 2-创建输入表,加载Hudi表数据
        tableEnv.executeSql(
                "CREATE TABLE order_hudi(\\n" +
                        "  orderId STRING PRIMARY KEY NOT ENFORCED,\\n" +
                        "  userId STRING,\\n" +
                        "  orderTime STRING,\\n" +
                        "  ip STRING,\\n" +
                        "  orderMoney DOUBLE,\\n" +
                        "  orderStatus INT,\\n" +
                        "  ts STRING,\\n" +
                        "  partition_day STRING\\n" +
                        ")\\n" +
                        "PARTITIONED BY (partition_day)\\n" +
                        "WITH (\\n" +
                        "    'connector' = 'hudi',\\n" +
                        "    'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/hudi_flink_kafka_sink',\\n" +
                        "    'table.type' = 'MERGE_ON_READ',\\n" +
                        "    'read.streaming.enabled' = 'true',\\n" +
                        "    'read.streaming.check-interval' = '4'\\n" +
                        ")"
        );

        // 3-执行查询语句,读取流式读取Hudi表数据
        tableEnv.executeSql(
                "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM order_hudi"
        ).print() ;
    

         4)、查看hdfs的文件

         5)、最后贴上POM文件参考

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>bigdata-parent</artifactId>
        <groupId>com.ouyangl</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>bigdata-flink</artifactId>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>$java.version</maven.compiler.source>
        <maven.compiler.target>$java.version</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.12.2</flink.version>
        <hadoop.version>2.7.3</hadoop.version>
        <mysql.version>8.0.16</mysql.version>
    </properties>

    <dependencies>
        <!-- Flink Client -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <!-- Flink Table API & SQL -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink-bundle_$scala.binary.version</artifactId>
            <version>0.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>
        <!-- MySQL/FastJson/lombok -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>$mysql.version</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <!-- slf4j及log4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
</project>

以上是关于Huid学习七:Hudi与Flink集成的主要内容,如果未能解决你的问题,请参考以下文章

hudi flink 集成error ,未解决

hudi flink 集成error ,未解决

hudi flink 集成error ,未解决

Flink CDC + Hudi 海量数据入湖在顺丰的实践

Flink 实战系列Flink SQL 实时同步 Kafka 数据到 Hudi(parquet + snappy)并且自动同步数据到 Hive

数据湖技术之Hudi 集成 Spark