FlinkSQL 整合 Hive-- flink-1.13.6
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkSQL 整合 Hive-- flink-1.13.6相关的知识,希望对你有一定的参考价值。
文章目录
一、概览
Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。
Flink 与 Hive 的集成包含两个层面。
-
一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。
-
二是利用 Flink 来读写 Hive 的表。
HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
配置 flink-conf.yaml
classloader.check-leaked-classloader: false
创建 hivecatalog
CREATE CATALOG myhive WITH (
\'type\' = \'hive\',
\'default-database\' = \'mydatabase\',
\'hive-conf-dir\' = \'/opt/hive-conf\'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
注意: Flink1.13开始移除了sql-client-defaults.yml配置⽂件,所以在该配置⽂件配置catalog的⽅法就不存在了,目前相关配置添加到sql-init.sql
文件即可。
二、Hive 方言
从 1.11.0 开始,在使用 Hive 方言时,Flink 允许用户用 Hive 语法来编写 SQL 语句。通过提供与 Hive 语法的兼容性,我们旨在改善与 Hive 的互操作性,并减少用户需要在 Flink 和 Hive 之间切换来执行不同语句的情况。
2.1、使用 Hive 方言
Flink 目前支持两种 SQL 方言: default 和 hive。你需要先切换到 Hive 方言,然后才能使用 Hive 语法编写
可以在 SQL 客户端启动后设置方言。
Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
[INFO] Session property has been set.
Flink SQL> set table.sql-dialect=default; -- to use default dialect
[INFO] Session property has been set.
2.2、案例
Flink SQL> use catalog myhive;
[INFO] Execute statement succeed.
Flink SQL> load module hive;
[INFO] Execute statement succeed.
Flink SQL> use modules hive,core;
[INFO] Execute statement succeed.
Flink SQL> set execution.type=batch;
[WARNING] The specified key \'execution.type\' is deprecated. Please use \'execution.runtime-mode\' instead.
[INFO] Session property has been set.
Flink SQL> SET sql-client.execution.result-mode=TABLEAU;
[INFO] Session property has been set.
Flink SQL> set table.sql-dialect=hive; -- 设置hive方言
[INFO] Session property has been set.
showFlink SQL> select explode(array(1,2,3)); -- 使用 hive udf 函数
Hive Session ID = f01725e8-895d-430c-957a-367729e466ca
+-----+
| col |
+-----+
| 1 |
| 2 |
| 3 |
+-----+
3 rows in set
Flink SQL> drop table if exists tbl;
Hive Session ID = 0731f471-d579-45f1-8eb9-e38a8d68f8e8
[INFO] Execute statement succeed.
Flink SQL> create table tbl (key int,value string);
Hive Session ID = bcd020ed-55aa-4859-be07-a134330c53a3
[INFO] Execute statement succeed.
Flink SQL> insert overwrite table tbl values (5,\'e\'),(1,\'a\'),(1,\'a\'),(3,\'c\'),(2,\'b\'),(3,\'c\'),(3,\'c\'),(4,\'d\');
Hive Session ID = c932ebe6-55b0-4310-9558-59eb6e500862
[INFO] Submitting SQL update statement to the cluster...
2022-11-07 23:42:11,433 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b7c6c304ac487c919243bf71eff0b1a8
Flink SQL> select * from tbl cluster by key; -- run cluster by
Hive Session ID = 1b8d0c3b-5a95-45cf-93d7-02a57297df99
2022-11-07 23:42:16,265 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
+-----+-------+
| key | value |
+-----+-------+
| 1 | a |
| 1 | a |
| 2 | b |
| 3 | c |
| 3 | c |
| 3 | c |
| 4 | d |
| 5 | e |
+-----+-------+
8 rows in set
Shutting down the session...
done.
报错1 Caused by: java.lang.ClassNotFoundException: org.antlr.runtime.tree.CommonTree
, 缺少依赖包
// add antlr-runtime if you need to use hive dialect
antlr-runtime-3.5.2.jar
报错2:
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.
三、Hive Read & Write
通过使用HiveCatalog, Apache Flink可以对Apache Hive表进行统一的BATCH和STREAM处理。这意味着Flink可以作为Hive批处理引擎的一个性能更好的替代方案,或者可以在Hive表中持续地读写数据,为实时数据仓库应用提供动力。
3.1、写
Flink支持以 批处理(Batch)和流处理(Streaming) 的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入作业结束时,才可以看到写入的数据。批处理的方式写入支持append模式和overwrite模式。
3.1.1、批处理模式写入
向非分区表写入数据
Flink SQL> SET table.sql-dialect=hive; -- hive 方言
Flink SQL> set execution.runtime-mode=\'batch\'; -- 使用批处理模式
Flink SQL> create table `users` (id int, name string); -- 在 flinksql 中创建 hive 非分区表
Flink SQL> INSERT INTO users SELECT 2,\'tom\';
向分区表写入数据
Flink SQL> SET table.sql-dialect=hive; -- hive 方言
Flink SQL> set execution.runtime-mode=\'batch\'; -- 使用批处理模式
Flink SQL> create table `users_p` (id int, name string) partitioned by (create_day string);
-- 向静态分区表写入数据
Flink SQL> INSERT OVERWRITE users_p PARTITION (create_day=\'2022-11-08\') SELECT 1, \'tom\';
-- 向动态分区表写入数据
Flink SQL> INSERT OVERWRITE users_p SELECT 1, \'tom\', \'2022-11-08\';
3.1.2、流处理模式写入
流式写入Hive表,不支持**Insert overwrite **方式,否则报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.
案例:
Flink SQL> set execution.runtime-mode=streaming;
-- 创建 hive表
SET table.sql-dialect=hive; -- hive方言
CREATE TABLE hive_table (
user_id string,
item_id string,
category_id string,
behavior string
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
\'partition.time-extractor.timestamp-pattern\'=\'$dt $hr:00:00\',
\'sink.partition-commit.trigger\'=\'partition-time\',
\'sink.partition-commit.delay\'=\'1 h\',
\'sink.partition-commit.policy.kind\'=\'metastore,success-file\'
);
-- 创建 kafka 表
SET table.sql-dialect=default; -- 默认,即flinksql
CREATE TABLE user_behavior (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
`ts` timestamp(3),
`proctime` as PROCTIME(), -- 处理时间列
WATERMARK FOR ts as ts - INTERVAL \'5\' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
\'connector\' = \'kafka\', -- 使用 kafka connector
\'topic\' = \'user_behavior\', -- kafka topic
\'scan.startup.mode\' = \'latest-offset\', -- 从起始 offset 开始读取
\'properties.bootstrap.servers\' = \'chb1:9092\',
\'properties.group.id\' = \'testGroup\',
\'format\' = \'csv\'
);
-- streaming sql, insert into hive table
INSERT INTO hive_table
SELECT user_id, item_id,category_id,behavior, DATE_FORMAT(`ts`, \'yyyy-MM-dd\'), DATE_FORMAT(`ts`, \'HH\')
FROM user_behavior;
-- batch sql,查询Hive表的分区数据
SELECT * FROM hive_table WHERE dt=\'2021-01-04\' AND hr=\'16\';
尖叫提示:
-
1.Flink读取Hive表默认使用的是batch模式,如果要使用流式读取Hive表,需要而外指定一些参数,见下文。
-
2.只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成_SUCCESS文件,所以,Flink流式写入Hive表需要开启并配置 Checkpoint。对于Flink SQL Client而言,需要在flink-conf.yaml中开启CheckPoint,配置内容为:
- state.backend: filesystem execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_ONCE state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints
3.2、读
Flink支持以 批处理(Batch)和流处理(Streaming) 的方式读取Hive中的表。批处理的方式与Hive的本身查询类似,即只在提交查询的时刻查询一次Hive表。流处理的方式将会持续地监控Hive表,并且会增量地提取新的数据。默认情况下,Flink是以批处理的方式读取Hive表。
关于流式读取Hive表,Flink既支持分区表又支持非分区表。对于分区表而言,Flink将会监控新产生的分区数据,并以增量的方式读取这些数据。对于非分区表,Flink会监控Hive表存储路径文件夹里面的新文件,并以增量的方式读取新的数据。
在 SQL Client 中需要显示地开启 SQL Hint 功能
Flink SQL> set table.dynamic-table-options.enabled= true;
使用SQLHint流式查询Hive表
SELECT * FROM hive_table/*+ OPTIONS(\'streaming-source.enable\'=\'true\', \'streaming-source.consume-start-offset\'=\'2021-01-03\') */;
3.3、Temporal Table Join
四、Hive Functions
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/hive_read_write/#writing
https://zhuanlan.zhihu.com/p/434562109
http://www.manongjc.com/detail/22-ygevltlxxowgkno.html
以上是关于FlinkSQL 整合 Hive-- flink-1.13.6的主要内容,如果未能解决你的问题,请参考以下文章
23.Flink-高级特性-新特性-Streaming Flie Sink介绍代码演示Flink-高级特性-新特性-FlinkSQL整合Hive添加依赖和jar包和配置
23.Flink-高级特性-新特性-Streaming Flie Sink介绍代码演示Flink-高级特性-新特性-FlinkSQL整合Hive添加依赖和jar包和配置
FlinkSQL 整合 Hive-- flink-1.13.6