FlinkSQL 整合 Hive-- flink-1.13.6

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkSQL 整合 Hive-- flink-1.13.6相关的知识,希望对你有一定的参考价值。

文章目录

一、概览

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

  • 一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

  • 二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

配置 flink-conf.yaml

classloader.check-leaked-classloader: false

创建 hivecatalog

CREATE CATALOG myhive WITH (
    \'type\' = \'hive\',
    \'default-database\' = \'mydatabase\',
    \'hive-conf-dir\' = \'/opt/hive-conf\'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

注意: Flink1.13开始移除了sql-client-defaults.yml配置⽂件,所以在该配置⽂件配置catalog的⽅法就不存在了,目前相关配置添加到sql-init.sql文件即可。

二、Hive 方言

从 1.11.0 开始,在使用 Hive 方言时,Flink 允许用户用 Hive 语法编写 SQL 语句。通过提供与 Hive 语法的兼容性,我们旨在改善与 Hive 的互操作性,并减少用户需要在 Flink 和 Hive 之间切换来执行不同语句的情况。

2.1、使用 Hive 方言

Flink 目前支持两种 SQL 方言: default 和 hive。你需要先切换到 Hive 方言,然后才能使用 Hive 语法编写

可以在 SQL 客户端启动后设置方言。


Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
[INFO] Session property has been set.

Flink SQL> set table.sql-dialect=default; -- to use default dialect
[INFO] Session property has been set.

2.2、案例

Flink SQL> use catalog myhive;
[INFO] Execute statement succeed.
Flink SQL> load module hive;
[INFO] Execute statement succeed.
Flink SQL> use modules hive,core;
[INFO] Execute statement succeed.
Flink SQL> set execution.type=batch;
[WARNING] The specified key \'execution.type\' is deprecated. Please use \'execution.runtime-mode\' instead.
[INFO] Session property has been set.
Flink SQL> SET sql-client.execution.result-mode=TABLEAU;
[INFO] Session property has been set.

Flink SQL> set table.sql-dialect=hive;			-- 设置hive方言
[INFO] Session property has been set.

showFlink SQL> select explode(array(1,2,3));       -- 使用 hive udf 函数
Hive Session ID = f01725e8-895d-430c-957a-367729e466ca
+-----+
| col |
+-----+
|   1 |
|   2 |
|   3 |
+-----+
3 rows in set
Flink SQL> drop table if exists tbl;
Hive Session ID = 0731f471-d579-45f1-8eb9-e38a8d68f8e8
[INFO] Execute statement succeed.
Flink SQL> create table tbl (key int,value string);
Hive Session ID = bcd020ed-55aa-4859-be07-a134330c53a3
[INFO] Execute statement succeed.
Flink SQL> insert overwrite table tbl values (5,\'e\'),(1,\'a\'),(1,\'a\'),(3,\'c\'),(2,\'b\'),(3,\'c\'),(3,\'c\'),(4,\'d\');
Hive Session ID = c932ebe6-55b0-4310-9558-59eb6e500862
[INFO] Submitting SQL update statement to the cluster...
2022-11-07 23:42:11,433 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b7c6c304ac487c919243bf71eff0b1a8

Flink SQL> select * from tbl cluster by key;  -- run cluster by
Hive Session ID = 1b8d0c3b-5a95-45cf-93d7-02a57297df99
2022-11-07 23:42:16,265 INFO  org.apache.hadoop.conf.Configuration.deprecation             [] - mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
+-----+-------+
| key | value |
+-----+-------+
|   1 |     a |
|   1 |     a |
|   2 |     b |
|   3 |     c |
|   3 |     c |
|   3 |     c |
|   4 |     d |
|   5 |     e |
+-----+-------+
8 rows in set

Shutting down the session...
done.

报错1 Caused by: java.lang.ClassNotFoundException: org.antlr.runtime.tree.CommonTree, 缺少依赖包


       // add antlr-runtime if you need to use hive dialect
       antlr-runtime-3.5.2.jar

报错2:

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.

三、Hive Read & Write

通过使用HiveCatalog, Apache Flink可以对Apache Hive表进行统一的BATCH和STREAM处理。这意味着Flink可以作为Hive批处理引擎的一个性能更好的替代方案,或者可以在Hive表中持续地读写数据,为实时数据仓库应用提供动力。

3.1、写

Flink支持以 批处理(Batch)和流处理(Streaming) 的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入作业结束时,才可以看到写入的数据。批处理的方式写入支持append模式和overwrite模式。

3.1.1、批处理模式写入

非分区表写入数据


Flink SQL> SET table.sql-dialect=hive;  -- hive 方言
Flink SQL> set execution.runtime-mode=\'batch\'; -- 使用批处理模式
Flink SQL> create table `users` (id int, name string);  -- 在 flinksql 中创建 hive 非分区表
Flink SQL> INSERT INTO users SELECT 2,\'tom\';

分区表写入数据

Flink SQL> SET table.sql-dialect=hive;  -- hive 方言
Flink SQL> set execution.runtime-mode=\'batch\'; -- 使用批处理模式

Flink SQL> create table `users_p` (id int, name string) partitioned by (create_day string);
-- 向静态分区表写入数据
Flink SQL> INSERT OVERWRITE users_p PARTITION (create_day=\'2022-11-08\') SELECT 1, \'tom\';
-- 向动态分区表写入数据
Flink SQL> INSERT OVERWRITE users_p SELECT 1, \'tom\', \'2022-11-08\';

3.1.2、流处理模式写入

流式写入Hive表,不支持**Insert overwrite **方式,否则报如下错误:

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.

案例:

Flink SQL> set execution.runtime-mode=streaming; 

-- 创建 hive表
SET table.sql-dialect=hive; -- hive方言
CREATE TABLE hive_table (
    user_id string,
    item_id string,
    category_id string,
    behavior string
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  \'partition.time-extractor.timestamp-pattern\'=\'$dt $hr:00:00\',
  \'sink.partition-commit.trigger\'=\'partition-time\',
  \'sink.partition-commit.delay\'=\'1 h\',
  \'sink.partition-commit.policy.kind\'=\'metastore,success-file\'
);

-- 创建 kafka 表
SET table.sql-dialect=default; -- 默认,即flinksql
CREATE TABLE user_behavior (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    `ts` timestamp(3),
	`proctime` as PROCTIME(),   -- 处理时间列
    WATERMARK FOR ts as ts - INTERVAL \'5\' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    \'connector\' = \'kafka\', -- 使用 kafka connector
    \'topic\' = \'user_behavior\',  -- kafka topic
    \'scan.startup.mode\' = \'latest-offset\', -- 从起始 offset 开始读取
	\'properties.bootstrap.servers\' = \'chb1:9092\',
	\'properties.group.id\' = \'testGroup\',
	\'format\' = \'csv\'
);


-- streaming sql, insert into hive table
INSERT INTO hive_table 
SELECT user_id, item_id,category_id,behavior, DATE_FORMAT(`ts`, \'yyyy-MM-dd\'), DATE_FORMAT(`ts`, \'HH\') 
FROM user_behavior;

-- batch sql,查询Hive表的分区数据
SELECT * FROM hive_table WHERE dt=\'2021-01-04\' AND  hr=\'16\';

尖叫提示:

  • 1.Flink读取Hive表默认使用的是batch模式,如果要使用流式读取Hive表,需要而外指定一些参数,见下文。

  • 2.只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成_SUCCESS文件,所以,Flink流式写入Hive表需要开启并配置 Checkpoint。对于Flink SQL Client而言,需要在flink-conf.yaml中开启CheckPoint,配置内容为:

    • state.backend: filesystem execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_ONCE state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints

3.2、读

Flink支持以 批处理(Batch)和流处理(Streaming) 的方式读取Hive中的表。批处理的方式与Hive的本身查询类似,即只在提交查询的时刻查询一次Hive表。流处理的方式将会持续地监控Hive表,并且会增量地提取新的数据。默认情况下,Flink是以批处理的方式读取Hive表。

关于流式读取Hive表,Flink既支持分区表又支持非分区表。对于分区表而言,Flink将会监控新产生的分区数据,并以增量的方式读取这些数据。对于非分区表,Flink会监控Hive表存储路径文件夹里面的新文件,并以增量的方式读取新的数据。

在 SQL Client 中需要显示地开启 SQL Hint 功能

Flink SQL> set table.dynamic-table-options.enabled= true;  

使用SQLHint流式查询Hive表

SELECT * FROM hive_table/*+ OPTIONS(\'streaming-source.enable\'=\'true\', \'streaming-source.consume-start-offset\'=\'2021-01-03\') */;

3.3、Temporal Table Join

四、Hive Functions

参考:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/hive_read_write/#writing
https://zhuanlan.zhihu.com/p/434562109
http://www.manongjc.com/detail/22-ygevltlxxowgkno.html

以上是关于FlinkSQL 整合 Hive-- flink-1.13.6的主要内容,如果未能解决你的问题,请参考以下文章

23.Flink-高级特性-新特性-Streaming Flie Sink介绍代码演示Flink-高级特性-新特性-FlinkSQL整合Hive添加依赖和jar包和配置

23.Flink-高级特性-新特性-Streaming Flie Sink介绍代码演示Flink-高级特性-新特性-FlinkSQL整合Hive添加依赖和jar包和配置

FlinkSQL 整合 Hive-- flink-1.13.6

数据湖(十八):Flink与Iceberg整合SQL API操作

Flinksql ----HiveCatalog

Flink StreamingFileSink 处理流程