1.18.3.Flink Catalog介绍Catalog 定义Catalog 的实现Catalog 使用举例
Posted to.to
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.18.3.Flink Catalog介绍Catalog 定义Catalog 的实现Catalog 使用举例相关的知识,希望对你有一定的参考价值。
1.18.3.Flink Catalog介绍
1.18.3.1.引言
1.18.3.2.Catalog 定义
1.18.3.3.Catalog 的实现
1.18.3.4.Catalog 使用举例
1.18.3.Flink Catalog介绍
1.18.3.1.引言
以下转自:http://legendtkl.com/2020/07/26/flink-catalog/
这篇文章我们介绍了一下 Flink 的 Catalog,基于 Flink 1.11,熟悉 Flink 或者 Spark 等大数据引擎的同学应该都知道这两个计算引擎都有一个共同的组件叫 Catalog。下面是 Flink 的 Catalog 的官方定义。
Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
数据处理最关键的方面之一是管理元数据。
元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的 UDF。
元数据也可以是持久化的,例如 Hive Metastore 中的元数据。
Catalog提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。
简单来说,Catalog 就是元数据管理中心,其中元数据包括数据库、表、表结构等信息。
1.18.3.2.Catalog 定义
Flink 的 Catalog 相关代码定义在 catalog.java 文件中,是一个 interface,如下。
/**
* This interface is responsible for reading and writing metadata such as database/table/views/UDFs
* from a registered catalog. It connects a registered catalog and Flink's Table API.
*/
@PublicEvolving
public interface Catalog {
...
}
既然是interface,我们来看一下支持的操作。
我们可以将这些接口做一个简单的分类。
-
Database 相关操作
getDefaultDataBase:获取默认的 database
getDatabase:获取特定的 database
listDatabases:列出所有的 database
databaseExists:判断 database 是否存在
createDatabases:创建 database
dropDatabases:删除 database
alterDatabases:修改 database -
Table 相关操作,一般都会有个参数是database
listTables:列出所有的 table 和 view
getTable:获取指定的 table 或者 view
tableExist:判断 table 或者 view 是否存在
dropTable:删除 table 或者 view
createTable:创建 table 或者 view
renameTable:重命名 table 或者 view
alterTable:修改 table 或者 view -
View 相关操作,除了和 table 共用方法外,还有一个独有的方法。
listViews:列出所有的 view -
Partition 相关操作,partition 是 table 的一个属性,所以参数一般都会带有 table 信息。
listPartition:列出 table 的所有 partition
getPartition:获取指定的 partition
partitionExist:判断 parition 是否存在
createPartition:创建 partition
dropPartition:删除 partition
alterPartition:修改 parition -
Function 相关操作,这里的 function 知道的是用户自定义的 function,也就是 Udf。
listFunctions:列出所有的 function
getFunction:获取指定的 func
functionExist:判断 function 是否存在
dropFunction:删除 function
alterFunction:修改 function
1.18.3.3.Catalog 的实现
从上图我们可以看到 Catalog 的最终实现有三个类:
HiveCatalog:使用 Hive 的元数据来作为 Flink 的 HiveCatalog
GenericInMemoryCatalog:使用内存实现 Catalog
JdbcCatalog:使用其他支持 jdbc 协议的关系型数据库来存储元数据
PostgresCatalog:使用 Postgres 数据库来作为 Catalog 存储元数据
1.18.3.4.Catalog 使用举例
下面的示例是 Flink SQL 使用 Catalog 的示例。
TableEnvironment tableEnv = ...
// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
tableEnv.listTables(); // should return the tables in current catalog and database.
下面是 api 的方式来使用 Catalog
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.descriptors.Kafka;
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
// Create a catalog table
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
catalog.createTable(
new ObjectPath("mydb", "mytable"),
new CatalogTableImpl(
schema,
new Kafka()
.version("0.11")
....
.startFromEarlist()
.toProperties(),
"my comment"
),
false
);
List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
1.18.3.5.自定义Catalog
Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。 想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。
CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。 这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。
1.18.3.6.总结
这篇文章写的比较简单,相当于自己的学习笔记,下一篇文章我们比较一下Spark 的 Catalog实现。
以上是关于1.18.3.Flink Catalog介绍Catalog 定义Catalog 的实现Catalog 使用举例的主要内容,如果未能解决你的问题,请参考以下文章
干货 | 字节跳动构建Data Catalog数据目录系统的实践(下)