Huid学习七:Hudi与Flink集成
Posted NC_NE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Huid学习七:Hudi与Flink集成相关的知识,希望对你有一定的参考价值。
一、环境准备
1、基础框架版本
框架 | 版本 | 备注 |
Apache Hadoop | 2.7.5 | |
Apache Hudi | 0.9.0 | |
Apache Flink | 1.12 | 基于Scala 2.12 |
2、执行环境安装部署
2.1、Flink 框架概述
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算
2.2、安装部署
此文档使用的Flink 1.12版本,部署Flink Standalone集群模式
1)、下载安装包 ,官网地址
2)、上传软件包到指定目录解压
tar -zxvf flink-1.12.2-bin-scala_2.12.tgz -C /opt/module/
3)、启动Flink本地集群
bin/start-cluster.sh
4)、访问Flink的Web UI
http://hadoop100:8081/#/overview
5)、执行官方示例, 可以看到flink任务运行成功,至此flink的安装部署完成。
bin/flink run examples/batch/WordCount.jar
2.3、添加Hudi依赖
1)、添加Hudi依赖jar包
从编译的Hudi目录下/hudi-0.9.0/packaging/hudi-flink-bundle/target/ 将 hudi-flink-bundle_2.12-0.9.0.jar,放入Flink安装的lib目录下即可
2.4、修改hadoop的环境变量
vim /etc/profile.d/env.sh (根据自己的机器来修改环境变量文件)
##HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-2.7.2
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath
2.5、启动集群
1)、设置hadoop环境变量,很重要
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
2)、启动Flink
bin/start-cluster.sh
3)、启动Flink SQL Cli命令行
bin/sql-client.sh embedded shell
4)、设置返回结果模式为tableau,让结果直接显示,设置处理模式为批处理
set execution.result-mode = tableau;
set execution.type = batch;
二、Flink SQL Client 操作Hudi
1、创建MOR表
(如果不指定table.type默认为copy on write表)
CREATE TABLE mor_test1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop100/datas/hudi-warehouse/Flink-Hudi/mor_test1',
'table.type' = 'MERGE_ON_READ'
);
2、插入数据
2.1、插入多条数据
INSERT INTO VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
2.2、成功插入数据,查询数据
1)、sql查询
2)、UI界面上可以看到任务的执行情况
3)、hdfs上查看Hudi表数据
3、修改数据
flink sql操作hudi时很多参数不像spark需要一一设置,但是都有default默认值,插入数据时没有像spark一样指定唯一键,其实flink表中是有唯一键的默认值为uuid
3.1、修改uuid为id1的数据
将原来年龄23岁改为27岁(flink sql除非第一次建表插入数据模式为overwrite,后续操作都是默认为append模式,可以直接触发修改操作)
insert into mor_test1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
三、流式查询(Streaming query)
流式读取没法在一个FlinkSQL Client内完成,需要两个客户端,一个客户端负责往表里面写数据,另一个客户端实时读数据
1、另起一客户端,创建表mor_test2
设置相关属性,以流的方式查询读取,映射到前面表:mor_test1,就是存储路径和mor_test1相同,及实时读取mor_test1的数据
CREATE TABLE mor_test2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/mor_test1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '2022070323131133',
'read.streaming.check-interval' = '4'
);
参数说明
read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
2、查询mor_test2表报错
1)、设置执行模式为流,进行查询
set execution.type=streaming;
2)、流式查询成功
3、使用代码来流式查询
public static void main(String[] args) throws Exception
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode() //设置流式模式
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);
//创建表
tableEnv.executeSql("" +
"CREATE TABLE mor_test2( \\n" +
" uuid VARCHAR(20), \\n" +
" name VARCHAR(10), \\n" +
" age INT, \\n" +
" ts TIMESTAMP(3), \\n" +
" `partition` VARCHAR(20)\\n" +
")PARTITIONED BY (`partition`)\\n" +
"WITH ( \\n" +
"'connector' = 'hudi', \\n" +
"'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/mor_test1', \\n" +
"'table.type' = 'MERGE_ON_READ',\\n" +
"'read.streaming.enabled' = 'true',\\n" +
"'read.streaming.start-commit' = '2022070323131133',\\n" +
"'read.streaming.check-interval' = '4'\\n" +
")");
tableEnv.executeSql("select * from mor_test2").print();
一直在等待数据的写入
四、Flink SQL实时写Hudi
1、整体架构
将之前文章使用StructuredStreaming流式程序改为Flink SQL程序,实时从Kafka消费Topic数据,解析转换后,存储至Hudi表中,示意图如下所示
2、消费Kafka数据
2.1、创建kafka的topic (hudi_flink)
bin/kafka-topics.sh --zookeeper hadoop100:2181 --create --replication-factor 1 --partitions 1 --topic hudi_flink
2.2、Flink SQL Client消费kafka数据
1)、创建kafka表
CREATE TABLE hudi_flink_kafka_source (
orderId STRING,
userId STRING,
orderTime STRING,
ip STRING,
orderMoney DOUBLE,
orderStatus INT
) WITH (
'connector' = 'kafka',
'topic' = 'hudi_flink',
'properties.bootstrap.servers' = 'hadoop100:9092',
'properties.group.id' = 'gid-1001',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
2)查询表hudi_kafka_source
记得添加flink sql连接kafka的jar包,对应自己的kafka版本,官网下载
我们往kafka的topic插入数据,可以看到Flink sql快速消费数据
"orderId": "20211122103434136000001","userId": "300000971","orderTime": "2021-11-22 10:34:34.136","ip": "123.232.118.98","orderMoney": 485.48,"orderStatus": 0
3、消费Kafka数据写入到Hudi表
3.1、创建Hudi表
CREATE TABLE hudi_flink_kafka_sink (
orderId STRING PRIMARY KEY NOT ENFORCED,
userId STRING,
orderTime STRING,
ip STRING,
orderMoney DOUBLE,
orderStatus INT,
ts STRING,
partition_day STRING
)
PARTITIONED BY (partition_day)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/hudi_flink_kafka_sink',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field'= 'orderId',
'write.precombine.field' = 'ts',
'write.tasks'= '1',
'compaction.tasks' = '1',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'num_commits',
'compaction.delta_commits' = '1'
);
3.2、写入数据到Hudi
1)先读取hudi_flink_kafka_source 表数据,再写入hudi_flink_kafka_sink 表里面
为什么需要对hudi_flink_kafka_source 的数据做个处理,因为我们存入Hudi需要配置三个字段,主键字段(默认UUID)、数据合并字段时间戳,默认名称ts、分区字段,建hudi表的时候已经指定了主键字段('hoodie.datasource.write.recordkey.field'= 'orderId'),数据合并字段('write.precombine.field' = 'ts')
INSERT INTO hudi_flink_kafka_sink
SELECT
orderId, userId, orderTime, ip, orderMoney, orderStatus,
substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day
FROM hudi_flink_kafka_source ;
2)查询hudi_flink_kafka_sink 表数据
3)、查询hdfs文件
3.3、代码实现消费Kafka数据写入到Hudi表
1)模拟kafka生产者
/**
* @author oyl
* @create 2022-07-10 10:44
* @Description 模拟kafka生产者产生订单数据
*/
object MockOrderProducer
def main(args: Array[String]): Unit =
// 1. Kafka Client Producer 配置信息
val pro: Properties = new Properties()
pro.put("bootstrap.servers","hadoop100:9092,hadoop101:9092")
pro.put("acks","1")
pro.put("retries","3")
pro.put("key.serializer",classOf[StringSerializer].getName)
pro.put("value.serializer",classOf[StringSerializer].getName)
//创建kafkaProducer对象,传入配置信息
val myProducer: KafkaProducer[String, String] = new KafkaProducer[String,String](pro)
//随机数实例对象
val random: Random = new Random()
//订单状态
val allStatus = Array(0,1,2,0,0,0,0,0,0,0,3,0)
//控制数据条数
val batchNumber: Int = random.nextInt(1)+5
try
(1 to batchNumber).foreach number =>
val currentTime: Long = System.currentTimeMillis()
val orderId: String = s"$getDate(currentTime)%06d".format(number)
val userId: String = s"$1 + random.nextInt(5)%08d".format(random.nextInt(1000))
val orderTime: String = getDate(currentTime, format = "yyyy-MM-dd HH:mm:ss.SSS")
val orderMoney: String = s"$5 + random.nextInt(500).%02d".format(random.nextInt(100))
val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
// 3. 订单记录数据
val orderRecord: OrderRecord = OrderRecord(
orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
)
// 转换为JSON格式数据
val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
println(orderJson)
// 4. 构建ProducerRecord对象
val record = new ProducerRecord[String, String]("hudi_flink", orderId, orderJson)
// 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
myProducer.send(record)
catch
case e: Exception => e.printStackTrace()
finally
if (null != myProducer) myProducer.close()
/** =================获取当前时间================= */
def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String =
val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
val formatDate: String = fastFormat.format(time) // 格式化日期
formatDate
/** ================= 获取随机IP地址 ================= */
def getRandomIp: String =
// ip范围
val range: Array[(Int, Int)] = Array(
(607649792, 608174079), //36.56.0.0-36.63.255.255
(1038614528, 1039007743), //61.232.0.0-61.237.255.255
(1783627776, 1784676351), //106.80.0.0-106.95.255.255
(2035023872, 2035154943), //121.76.0.0-121.77.255.255
(2078801920, 2079064063), //123.232.0.0-123.235.255.255
(-1950089216, -1948778497), //139.196.0.0-139.215.255.255
(-1425539072, -1425014785), //171.8.0.0-171.15.255.255
(-1236271104, -1235419137), //182.80.0.0-182.92.255.255
(-770113536, -768606209), //210.25.0.0-210.47.255.255
(-569376768, -564133889) //222.16.0.0-222.95.255.255
)
// 随机数:IP地址范围下标
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
// 转换Int类型IP地址为IPv4格式
number2IpString(ipNumber)
/** =================将Int类型IPv4地址转换为字符串类型================= */
def number2IpString(ip: Int): String =
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".")
样例类
case class OrderRecord (
orderId:String,
userId:String,
orderTime:String,
ip:String,
orderMoney:Double,
orderStatus:Int
)
2)、消费kafka数据并写入Hudi表
/**
* @author oyl
* @create 2022-07-10 11:31
* @Description : FlinkSQL读取kafka数据写入Hudi
*/
public class FlinkSQLReadKafkaToHudi
public static void main(String[] args)
//1、设置执行环境,使用流式API
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//设置CheckPoint
streamEnv.setParallelism(1)
.enableCheckpointing(5000);
EnvironmentSettings setting = EnvironmentSettings
.newInstance()
.inStreamingMode() //设置流式模式
.build();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, setting);
// 2-创建输入表,从Kafka消费数据
streamTableEnv.executeSql("CREATE TABLE hudi_flink_kafka_source (\\n" +
" orderId STRING,\\n" +
" userId STRING,\\n" +
" orderTime STRING,\\n" +
" ip STRING,\\n" +
" orderMoney DOUBLE,\\n" +
" orderStatus INT\\n" +
") WITH (\\n" +
" 'connector' = 'kafka',\\n" +
" 'topic' = 'hudi_flink',\\n" +
" 'properties.bootstrap.servers' = 'hadoop100:9092',\\n" +
" 'properties.group.id' = 'gid-1009',\\n" +
" 'scan.startup.mode' = 'latest-offset',\\n" +
" 'format' = 'json',\\n" +
" 'json.fail-on-missing-field' = 'false',\\n" +
" 'json.ignore-parse-errors' = 'true'\\n" +
")");
// 3-转换数据
Table etltable = streamTableEnv.from("hudi_flink_kafka_source")
.addColumns($("orderId").substring(0, 17).as("ts"))
.addColumns($("orderTime").substring(0, 10).as("partition_day"));
streamTableEnv.createTemporaryView("kafka_table",etltable);
//streamTableEnv.executeSql("select * from kafka_table").print();
// 4-创建输出表,关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息
streamTableEnv.executeSql("CREATE TABLE hudi_flink_kafka_sink (\\n" +
" orderId STRING PRIMARY KEY NOT ENFORCED,\\n" +
" userId STRING,\\n" +
" orderTime STRING,\\n" +
" ip STRING,\\n" +
" orderMoney DOUBLE,\\n" +
" orderStatus INT,\\n" +
" ts STRING,\\n" +
" partition_day STRING\\n" +
")\\n" +
"PARTITIONED BY (partition_day) \\n" +
"WITH (\\n" +
" 'connector' = 'hudi',\\n" +
" 'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/hudi_flink_kafka_sink',\\n" +
" 'table.type' = 'MERGE_ON_READ',\\n" +
" 'write.operation' = 'upsert',\\n" +
" 'hoodie.datasource.write.recordkey.field'= 'orderId',\\n" +
" 'write.precombine.field' = 'ts',\\n" +
" 'write.tasks'= '1',\\n" +
" 'compaction.tasks' = '1', \\n" +
" 'compaction.async.enabled' = 'true', \\n" +
" 'compaction.trigger.strategy' = 'num_commits', \\n" +
" 'compaction.delta_commits' = '1'\\n" +
")"
);
// 5-通过子查询方式,将数据写入输出表
streamTableEnv.executeSql(
"INSERT INTO hudi_flink_kafka_sink " +
"SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM kafka_table"
);
3)读取Hudi表数据
/**
* @author oyl
* @create 2022-07-10 14:34
* @Description 使用Flink SQL读取Hudi表数据
*/
public class FlinkSQLReadHudi
public static void main(String[] args)
// 1-获取表执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings) ;
// 2-创建输入表,加载Hudi表数据
tableEnv.executeSql(
"CREATE TABLE order_hudi(\\n" +
" orderId STRING PRIMARY KEY NOT ENFORCED,\\n" +
" userId STRING,\\n" +
" orderTime STRING,\\n" +
" ip STRING,\\n" +
" orderMoney DOUBLE,\\n" +
" orderStatus INT,\\n" +
" ts STRING,\\n" +
" partition_day STRING\\n" +
")\\n" +
"PARTITIONED BY (partition_day)\\n" +
"WITH (\\n" +
" 'connector' = 'hudi',\\n" +
" 'path' = 'hdfs://hadoop100:9000/datas/hudi-warehouse/Flink-Hudi/hudi_flink_kafka_sink',\\n" +
" 'table.type' = 'MERGE_ON_READ',\\n" +
" 'read.streaming.enabled' = 'true',\\n" +
" 'read.streaming.check-interval' = '4'\\n" +
")"
);
// 3-执行查询语句,读取流式读取Hudi表数据
tableEnv.executeSql(
"SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM order_hudi"
).print() ;
4)、查看hdfs的文件
5)、最后贴上POM文件参考
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bigdata-parent</artifactId>
<groupId>com.ouyangl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bigdata-flink</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>$java.version</maven.compiler.source>
<maven.compiler.target>$java.version</maven.compiler.target>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.12.2</flink.version>
<hadoop.version>2.7.3</hadoop.version>
<mysql.version>8.0.16</mysql.version>
</properties>
<dependencies>
<!-- Flink Client -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- Flink Table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_$scala.binary.version</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<!-- MySQL/FastJson/lombok -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>$mysql.version</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- slf4j及log4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
以上是关于Huid学习七:Hudi与Flink集成的主要内容,如果未能解决你的问题,请参考以下文章
Flink 实战系列Flink SQL 实时同步 Kafka 数据到 Hudi(parquet + snappy)并且自动同步数据到 Hive