数据湖架构HudiHudi集成Flink案例详解
Posted undo_try
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据湖架构HudiHudi集成Flink案例详解相关的知识,希望对你有一定的参考价值。
五、Hudi集成Flink案例详解
5.1 hudi集成flink
flink的下载地址:
https://archive.apache.org/dist/flink/
Hudi | Supported Flink version |
---|---|
0.12.x | 1.15.x、1.14.x、1.13.x |
0.11.x | 1.14.x、1.13.x |
0.10.x | 1.13.x |
0.9.0 | 1.12.2 |
- 将上述编译好的安装包拷贝到flink下的jars目录中:
cp /opt/apps/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/apps/flink-1.13.6/lib/
- 拷贝guava包,解决依赖冲突
cp /opt/apps/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flink-1.13.6/lib/
- 配置Hadoop环境变量
vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
source /etc/profile.d/my_env.sh
5.2 sql-client之yarn-session模式
配置hadoop调度器yarn
mapred-site.xml
<configuration>
<!-- 指定MapReduce作业执⾏时,使⽤YARN进⾏资源调度 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<!-- 设置ResourceManager -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>centos04</value>
</property>
<!--配置yarn的shuffle服务-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
hadoop-env.sh
# 在最后面添加如下:
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
# 记得配置sql-client-defaults.yaml
5.2.1 启动
# 1、修改配置文件
vim /opt/apps/flink-1.13.6/conf/flink-conf.yaml
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4
state.backend: rocksdb
execution.checkpointing.interval: 30000 # 开启ck,才能快速从内存中flush出去
state.checkpoints.dir: hdfs://centos04:9000/ckps
state.backend.incremental: true
# 2、yarn-session模式启动
# 解决依赖问题
cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/
# 启动yarn-session
/opt/apps/flink-1.13.6/bin/yarn-session.sh -d
# 启动sql-client
/opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session
5.2.2 插入数据
set sql-client.execution.result-mode=tableau;
-- 创建hudi表
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1',
'table.type' = 'MERGE_ON_READ' -- 默认是COW
);
或如下写法
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20),
PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1',
'table.type' = 'MERGE_ON_READ'
);
-- 插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
-- 查询数据
select * from t1;
5.2.3 流式插入
-- 1、创建测试表
CREATE TABLE sourceT (
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create table t2(
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
)
with (
'connector' = 'hudi',
'path' = '/tmp/hudi_flink/t2',
'table.type' = 'MERGE_ON_READ'
);
-- 2、执行插入
insert into t2 select * from sourceT;
查询结果
set sql-client.execution.result-mode=tableau;
Flink SQL> select * from t2 limit 10; -- 会产生一个collect的flink任务,拉取10条数据,注意:不是流读取
2023-03-06 22:45:10,403 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-03-06 22:45:12,897 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at centos04/192.168.42.104:8032
2023-03-06 22:45:12,899 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-03-06 22:45:12,918 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface centos04:45452 of application 'application_1678113536312_0001'.
+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+
| op | uuid | name | age | ts | partition |
+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+
| +I | d0523c31d3da5b8e2a8ff676dcf... | 327db70824413c5dcde0a7ac10c... | 1971040768 | 2023-03-06 14:40:58.780 | 42b45346672bf719b5393232763... |
| +I | cfc07cbebf6890a04942ec88947... | 36fc7a58aab88835f11b3b51a40... | -12199364 | 2023-03-06 14:41:05.781 | e33c02173f4c744fb9c1c68e774... |
| +I | 668b204a933494a89b829c76bc6... | aa9ff2109457fdcd5f099b8ce98... | 2061449955 | 2023-03-06 14:41:14.780 | 680514e53b196324423cd12cda5... |
| +I | 95fe7878909a801c2726f1d05f5... | 1c86b29fe313e557688df0ba950... | 519997290 | 2023-03-06 14:41:11.781 | b9817c52301ab4614c3053c9ccc... |
| +I | 8661c25c8c930f4660fbefa867e... | 01a2bee6b99064c7bca9513ca37... | -682830738 | 2023-03-06 14:41:32.781 | 16ab837502a31e208b06bb74efd... |
| +I | 55ce03895e229b29546dbdd2ff3... | 77f2552de13337e8092c1445654... | 2011273584 | 2023-03-06 14:41:09.780 | 3fd688cfa17b2a3a6fd3ffac6bd... |
| +I | 50c23f315d736c313b652b34fc5... | 4f9c84ff75466fba8e800daabd0... | -190184764 | 2023-03-06 14:42:26.780 | 7f2a07a1007b2fbfea8cbb2062e... |
| +I | 8073e8c70a9bc0e79c2e69aa885... | 30bf89c80d9ab0f0a8f5f883ee6... | -1639873427 | 2023-03-06 14:41:24.781 | 15df7d527d6d7edae496e76d02f... |
| +I | 29a61b7cd348d08498d2b089a5d... | 77a63ca7a2e77e6d167de20c673... | 71527378 | 2023-03-06 14:42:14.781 | 2842db44a691f4f1d597ac79086... |
| +I | e5defc24191f60557644b7d14e2... | 56bdd04424b8f422d4075ade510... | 1054223989 | 2023-03-06 14:40:42.781 | e8d2d3c6fed90d37b15647d1ecd... |
+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+
5.3 使用IDEA开发
除了用sql-client,还可以自己编写FlinkSQL程序,打包提交Flink作业。
1、首先,需要将hudi集成flink的jar包,装载到本地的仓库,命令如下:
D:\\bigdata\\hudi从入门到精通\\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom ---
[INFO] Installing D:\\bigdata\\hudi从入门到精通\\apps\\hudi-flink1.13-bundle-0.12.0.jar to D:\\doit\\apps\\repository\\org\\apache\\hudi\\hudi-flink_2.12\\0.12.0\\hudi-flink_2.12-0.12.0.jar
[INFO] Installing C:\\Users\\Undo\\AppData\\Local\\Temp\\mvninstall50353756903805721.pom to D:\\doit\\apps\\repository\\org\\apache\\hudi\\hudi-flink_2.12\\0.12.0\\hudi-flink_2.12-0.12.0.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.111 s
[INFO] Finished at: 2023-03-02T10:08:15+08:00
[INFO] ------------------------------------------------------------------------
2、导入pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi-start</artifactId>
<groupId>com.yyds</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-flink</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<hudi.version>0.12.0</hudi.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
<scope>provided</scope> <!--不会打包到依赖中,只参与编译,不参与运行 -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<!--idea运行时也有webui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
以上是关于数据湖架构HudiHudi集成Flink案例详解的主要内容,如果未能解决你的问题,请参考以下文章
2万字,详解数据湖,概念特征架构方案场景以及建湖全过程(建议收藏)