数据湖(十八):Flink与Iceberg整合SQL API操作

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据湖(十八):Flink与Iceberg整合SQL API操作相关的知识,希望对你有一定的参考价值。

文章目录

Flink与Iceberg整合SQL API操作

一、​​​​​​​​​​​​​​SQL API 创建Iceberg表并写入数据

1、创建新项目,导入如下maven依赖包

2、编写Flink SQL 创建Iceberg表并写入数据

3、在Hive中映射Iceberg表并查询

二、​​​​​​​​​​​​​​SQL API 批量查询Iceberg表数据

三、​​​​​​​​​​​​​​SQL API 实时查询Iceberg表数据

四、​​​​​​​​​​​​​​SQL API指定基于快照实时增量读取数据


Flink与Iceberg整合SQL API操作

Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQL API 操作Iceberg。

一、​​​​​​​​​​​​​​SQL API 创建Iceberg表并写入数据

1、创建新项目,导入如下maven依赖包

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
  <flink.version>1.11.6</flink.version>
  <hadoop.version>3.2.2</hadoop.version>
</properties>

<dependencies>
  <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-flink-runtime</artifactId>
    <version>0.11.1</version>
  </dependency>

  <!-- java 开发Flink 所需依赖 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>$flink.version</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>

  <!-- Flink Kafka连接器的依赖 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>$flink.version</version>
  </dependency>

  <!-- 读取hdfs文件需要jar包-->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>$hadoop.version</version>
  </dependency>

  <!-- Flink SQL & Table-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime-blink_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>$flink.version</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>$flink.version</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>$flink.version</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>$flink.version</version>
  </dependency>


  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>

  <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.5</version>
  </dependency>
</dependencies>

2、编写Flink SQL 创建Iceberg表并写入数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

        //1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
                "'type'='iceberg'," +
                "'catalog-type'='hadoop'," +
                "'warehouse'='hdfs://mycluster/flink_iceberg')");

        //2.使用当前Catalog
        tblEnv.useCatalog("hadoop_iceberg");

        //3.创建数据库
        tblEnv.executeSql("create database iceberg_db");

        //4.使用数据库
        tblEnv.useDatabase("iceberg_db");

        //5.创建iceberg表 flink_iceberg_tbl
        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

        //6.写入数据到表 flink_iceberg_tbl
        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

3、在Hive中映射Iceberg表并查询

在Hive中执行如下命令创建对应的Iceberg表:

#在Hive中创建Iceberg表
CREATE TABLE flink_iceberg_tbl2  (
  id int, 
  name string,
  age int,
  loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
#在Hive中查询Iceberg表中的数据
hive> select * from flink_iceberg_tbl2;
OK
3	ww	20	guangzhou
1	zs	18	beijing
2	ls	19	shanghai

二、​​​​​​​​​​​​​​SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
    "'type'='iceberg'," +
    "'catalog-type'='hadoop'," +
    "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.批量读取表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

tableResult.print();

结果如下:

三、​​​​​​​​​​​​​​SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
        "'type'='iceberg'," +
        "'catalog-type'='hadoop'," +
        "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据
// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

tableResult.print();

 启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

#向Hive 中Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

 

四、​​​​​​​​​​​​​​SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);

Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
      "'type'='iceberg'," +
      "'catalog-type'='hadoop'," +
      "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取
//start-snapshot-id :快照ID
TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");
tableResult2.print();

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

以上是关于数据湖(十八):Flink与Iceberg整合SQL API操作的主要内容,如果未能解决你的问题,请参考以下文章

数据湖(十七):Flink与Iceberg整合DataStream API操作

数据湖(二十):Flink兼容Iceberg目前不足和Iceberg与Hudi对比

数据湖(二十):Flink兼容Iceberg目前不足和Iceberg与Hudi对比

数据湖(十四):Spark与Iceberg整合查询操作

数据湖(十五):Spark与Iceberg整合写操作

数据湖:Hive与Iceberg整合