数据湖架构HudiHudi集成Flink案例详解

Posted undo_try

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据湖架构HudiHudi集成Flink案例详解相关的知识,希望对你有一定的参考价值。

五、Hudi集成Flink案例详解

5.1 hudi集成flink

flink的下载地址:

https://archive.apache.org/dist/flink/

HudiSupported Flink version
0.12.x1.15.x1.14.x1.13.x
0.11.x1.14.x1.13.x
0.10.x1.13.x
0.9.01.12.2
  • 将上述编译好的安装包拷贝到flink下的jars目录中:
cp /opt/apps/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/apps/flink-1.13.6/lib/
  • 拷贝guava包,解决依赖冲突
cp /opt/apps/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flink-1.13.6/lib/
  • 配置Hadoop环境变量
vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

source /etc/profile.d/my_env.sh

5.2 sql-client之yarn-session模式

配置hadoop调度器yarn

mapred-site.xml


<configuration>
<!-- 指定MapReduce作业执⾏时,使⽤YARN进⾏资源调度 -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>yarn.app.mapreduce.am.env</name>
        <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value>  
    </property>
    <property>
        <name>mapreduce.map.env</name>
        <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value>
    </property>
    <property>
        <name>mapreduce.reduce.env</name>
        <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value>
    </property>
</configuration>


yarn-site.xml

<configuration>
<!-- 设置ResourceManager -->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>centos04</value>              
    </property>
<!--配置yarn的shuffle服务-->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>


hadoop-env.sh
# 在最后面添加如下:
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

# 记得配置sql-client-defaults.yaml

5.2.1 启动

# 1、修改配置文件
vim /opt/apps/flink-1.13.6/conf/flink-conf.yaml


classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

 
state.backend: rocksdb
execution.checkpointing.interval: 30000 # 开启ck,才能快速从内存中flush出去
state.checkpoints.dir: hdfs://centos04:9000/ckps
state.backend.incremental: true


# 2、yarn-session模式启动

# 解决依赖问题
cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/

# 启动yarn-session
/opt/apps/flink-1.13.6/bin/yarn-session.sh -d
# 启动sql-client
/opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session

5.2.2 插入数据

set sql-client.execution.result-mode=tableau;

-- 创建hudi表
CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1',
  'table.type' = 'MERGE_ON_READ' -- 默认是COW
);


或如下写法
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20),
  PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1',
  'table.type' = 'MERGE_ON_READ'
);


-- 插入数据
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
  
  
  
-- 查询数据
select * from t1;

5.2.3 流式插入

-- 1、创建测试表
CREATE TABLE sourceT (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t2',
  'table.type' = 'MERGE_ON_READ'
);

-- 2、执行插入
insert into t2 select * from sourceT;


查询结果
set sql-client.execution.result-mode=tableau;
Flink SQL> select * from t2 limit 10;  -- 会产生一个collect的flink任务,拉取10条数据,注意:不是流读取
2023-03-06 22:45:10,403 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-03-06 22:45:12,897 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at centos04/192.168.42.104:8032
2023-03-06 22:45:12,899 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-03-06 22:45:12,918 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface centos04:45452 of application 'application_1678113536312_0001'.
+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+
| op |                           uuid |                           name |         age |                      ts |                      partition |
+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+
| +I | d0523c31d3da5b8e2a8ff676dcf... | 327db70824413c5dcde0a7ac10c... |  1971040768 | 2023-03-06 14:40:58.780 | 42b45346672bf719b5393232763... |
| +I | cfc07cbebf6890a04942ec88947... | 36fc7a58aab88835f11b3b51a40... |   -12199364 | 2023-03-06 14:41:05.781 | e33c02173f4c744fb9c1c68e774... |
| +I | 668b204a933494a89b829c76bc6... | aa9ff2109457fdcd5f099b8ce98... |  2061449955 | 2023-03-06 14:41:14.780 | 680514e53b196324423cd12cda5... |
| +I | 95fe7878909a801c2726f1d05f5... | 1c86b29fe313e557688df0ba950... |   519997290 | 2023-03-06 14:41:11.781 | b9817c52301ab4614c3053c9ccc... |
| +I | 8661c25c8c930f4660fbefa867e... | 01a2bee6b99064c7bca9513ca37... |  -682830738 | 2023-03-06 14:41:32.781 | 16ab837502a31e208b06bb74efd... |
| +I | 55ce03895e229b29546dbdd2ff3... | 77f2552de13337e8092c1445654... |  2011273584 | 2023-03-06 14:41:09.780 | 3fd688cfa17b2a3a6fd3ffac6bd... |
| +I | 50c23f315d736c313b652b34fc5... | 4f9c84ff75466fba8e800daabd0... |  -190184764 | 2023-03-06 14:42:26.780 | 7f2a07a1007b2fbfea8cbb2062e... |
| +I | 8073e8c70a9bc0e79c2e69aa885... | 30bf89c80d9ab0f0a8f5f883ee6... | -1639873427 | 2023-03-06 14:41:24.781 | 15df7d527d6d7edae496e76d02f... |
| +I | 29a61b7cd348d08498d2b089a5d... | 77a63ca7a2e77e6d167de20c673... |    71527378 | 2023-03-06 14:42:14.781 | 2842db44a691f4f1d597ac79086... |
| +I | e5defc24191f60557644b7d14e2... | 56bdd04424b8f422d4075ade510... |  1054223989 | 2023-03-06 14:40:42.781 | e8d2d3c6fed90d37b15647d1ecd... |
+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+

5.3 使用IDEA开发

除了用sql-client,还可以自己编写FlinkSQL程序,打包提交Flink作业。

1、首先,需要将hudi集成flink的jar包,装载到本地的仓库,命令如下:

D:\\bigdata\\hudi从入门到精通\\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom ---
[INFO] Installing D:\\bigdata\\hudi从入门到精通\\apps\\hudi-flink1.13-bundle-0.12.0.jar to D:\\doit\\apps\\repository\\org\\apache\\hudi\\hudi-flink_2.12\\0.12.0\\hudi-flink_2.12-0.12.0.jar
[INFO] Installing C:\\Users\\Undo\\AppData\\Local\\Temp\\mvninstall50353756903805721.pom to D:\\doit\\apps\\repository\\org\\apache\\hudi\\hudi-flink_2.12\\0.12.0\\hudi-flink_2.12-0.12.0.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.111 s
[INFO] Finished at: 2023-03-02T10:08:15+08:00
[INFO] ------------------------------------------------------------------------

2、导入pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>hudi-start</artifactId>
        <groupId>com.yyds</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hudi-flink</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.6</flink.version>
        <hudi.version>0.12.0</hudi.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>   <!--不会打包到依赖中,只参与编译,不参与运行 -->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>

        <!--idea运行时也有webui-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
        

以上是关于数据湖架构HudiHudi集成Flink案例详解的主要内容,如果未能解决你的问题,请参考以下文章

数据湖架构HudiHudi核心概念

Flink集成Iceberg在同程艺龙的实践

2万字,详解数据湖,概念特征架构方案场景以及建湖全过程(建议收藏)

重磅!Impala 3.4正式集成开源数据湖框架Apache Hudi

flink平台项目-cnblog

Spark+Flink+Iceberg打造湖仓一体架构实践探索