详述存算分离场景下的统一数据Catalog
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详述存算分离场景下的统一数据Catalog相关的知识,希望对你有一定的参考价值。
本文首发微信公众号:码上观世界
What Is a Data Catalog
Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.
There are three kinds of metadata:
Technical metadata: Schemas, tables, columns, file names, report names – anything that is documented in the source system
Business metadata: This is typically the business knowledge that users have about the assets in the organization. This might include business descriptions, comments, annotations, classifications, fitness-for-use, ratings, and more.
Operational metadata: When was this object refreshed? Which ETL job created it? How many times has a table been accessed by users—and which one?
Challenges a Data Catalog Can Address
many challenges with finding and accessing the right data. These include:
Wasted time and effort on finding and accessing data
Data lakes turning into data swamps
No common business vocabulary
Hard to understand structure and variety of “dark data”
Difficult to assess provenance, quality, trustworthiness
No way to capture tribal or missing knowledge
Difficult to reuse knowledge and data assets
Manual and ad-hoc data prep efforts
not only is data access becoming a challenge, but data governance has become a challenge as well. It’s critical to understand the kind of data that you have now, who is moving it, what it’s being used for, and how it needs to be protected.
Data Catalog Users
Data engineers want to know how any changes will affect the system as a whole. They might ask:
What will be the impact of a schema change in our CRM application?
How different are the Peoplesoft and HCM data structures?
Data scientists want easy access to data and they want to know more about the quality of the data. They are looking for information such as:
Where can I find and explore some geo-location data?
How can I easily access the data in the data lake?
Data stewards are charged with a managed data process. They care about concepts, agreements between stakeholders, and managing the lifecycle of the data itself. They will ask questions such as:
Are we really improving the quality of our operational data?
Have we defined standards for important key data elements?
Chief Data Officers care about who is doing what in the organization. They’re typically not the ones using a data catalog, but they still want to know answers to questions such as:
Who can access customers’ personal information?
Do we have retention policies defined for all data?
What Should a Data Catalog Offer?
A good data catalog should offer:
Search and discovery. A data catalog should have flexible searching and filtering options to allow users to quickly find relevant sets of data for data science, analytics or data engineering. Or browse metadata based on a technical hierarchy of data assets. Enabling users to enter technical information, user defined tags, or business terms also improves the search capabilities.
Harvest metadata from various sources. Make sure your data catalog can harvest technical metadata from a variety of connected data assets, including object storage, self-driving databases, on-premises systems, and much more.
Metadata curation. Provide a way for subject matter experts to contribute business knowledge in the form of an enterprise business glossary, tags, associations, user-defined annotations, classifications, ratings, and more.
Automation and data intelligence. At the data scales that we mentioned, AI and machine learning are often a must. Any and all manual tasks that can be automated should be automated with AI and machine learning techniques on the collected metadata. In addition, AI and machine learning can begin to truly augment capabilities with data, such as providing data recommendations to data catalog users and the users of other services in a modern data platform.
Enterprise-class capabilities. Your data is important, and you need enterprise-class capabilities to use it properly, such as identity and access management, and main capabilities via REST APIs. This would also mean that customers and partners can contribute metadata (such as custom harvesters) and also expose data catalog capabilities in their own applications via REST.
How Hive Store Metadata
The Metastore provides two important but often overlooked features of a data warehouse: data abstraction and data discovery. Without the data abstractions provided in Hive, a user has to provide information about data formats, extractors and loaders along with the query. In Hive, this information is given during table creation and reused every time the table is referenced. This is very similar to the traditional warehousing systems. The second functionality, data discovery, enables users to discover and explore relevant and specific data in the warehouse. Other tools can be built using this metadata to expose and possibly enhance the information about the data and its availability. Hive accomplishes both of these features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.
table structure(HMS 2.x)
access interface definition(Incomplete)
very thorough,right?
How Flink Implements Catalog
interface definition
void open() throws CatalogException;
void close() throws CatalogException;
String getDefaultDatabase() throws CatalogException;
List<String> listDatabases() throws CatalogException;
CatalogDatabase getDatabase(String databaseName);
boolean databaseExists(String databaseName) throws CatalogException;
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);
List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException;
List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException;
CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
boolean tableExists(ObjectPath tablePath) throws CatalogException;
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists);
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);
boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec);
void createPartition(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogPartition partition,
boolean ignoreIfExists);
void dropPartition(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists);
List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException;
boolean functionExists(ObjectPath functionPath) throws CatalogException;
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
CatalogTableStatistics getTableStatistics(ObjectPath tablePath);
CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath);
CatalogTableStatistics getPartitionStatistics(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec);
...
Flink Catalog对接的元数据存储实现有三种:HiveCatalog、JdbcCatalog和MemoryCatalog。
usage
hiveConf = new HiveConf(new Configuration(), MyExample.class);
hiveCatalog = new HiveCatalog(hiveConf);
tableEnv.registerCatalog(catalogName, hiveCatalog);
tableEnv.useCatalog(catalogName);
tableEnv.executeSql("CREATE TABLE...");
-- register a mysql table 'users' in Flink SQL
-- create catalog mycatalog
-- create database mydabase
CREATE TABLE MyTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
relations between catalog and connector(data source)
How Trino Implements Catalog
interface definition
public class Catalog
private final String catalogName;
private final CatalogName connectorCatalogName;
private final String connectorName;
private final Connector connector;
private final CatalogName informationSchemaId;
private final Connector informationSchema;
private final CatalogName systemTablesId;
private final Connector systemTables;
#Connector.class
ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit);
ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle);
ConnectorSplitManager getSplitManager();
ConnectorPageSourceProvider getPageSourceProvider();
ConnectorRecordSetProvider getRecordSetProvider();
ConnectorPageSinkProvider getPageSinkProvider();
ConnectorIndexProvider getIndexProvider();
ConnectorNodePartitioningProvider getNodePartitioningProvider();
Set<SystemTable> getSystemTables();
Set<Procedure> getProcedures();
Set<TableProcedureMetadata> getTableProcedures();
List<PropertyMetadata<?>> getSessionProperties();
List<PropertyMetadata<?>> getSchemaProperties();
List<PropertyMetadata<?>> getAnalyzeProperties();
List<PropertyMetadata<?>> getTableProperties();
List<PropertyMetadata<?>> getMaterializedViewProperties();
List<PropertyMetadata<?>> getColumnProperties();
ConnectorAccessControl getAccessControl();
Iterable<EventListener> getEventListeners();
void commit(ConnectorTransactionHandle transactionHandle) ;
void rollback(ConnectorTransactionHandle transactionHandle);
boolean isSingleStatementWritesOnly();
void shutdown();
Set<ConnectorCapabilities> getCapabilities();
usage
show catalogs;
use catalog hive;
USE catalog.schema
show schemas;
USE information_schema;
show tables;
...
relations between catalog and connector(data source)
How Iceberg Implements Catalog
interface definition
List<TableIdentifier> listTables(Namespace namespace);
Table createTable(
TableIdentifier identifier,
Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties);
boolean tableExists(TableIdentifier identifier);
void renameTable(TableIdentifier from, TableIdentifier to);
boolean dropTable(TableIdentifier identifier, boolean purge);
boolean dropTable(TableIdentifier identifier) ;
Table loadTable(TableIdentifier identifier);
TableBuilder buildTable(TableIdentifier identifier, Schema schema);
void initialize(String name, Map<String, String> properties);
Transaction newCreateTableTransaction(
TableIdentifier identifier,
Schema schema,
PartitionSpec spec);
Transaction newReplaceTableTransaction(
TableIdentifier identifier,
Schema schema,
boolean orCreate);
...
Iceberg Catalog对接的元数据存储实现有:Hive Catalog、Hadoop Catalog、CacheCatalog、JDBC Catalog以及自定义Catalog。
usage
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://nn:8020/warehouse/path'
);
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://nn:8020/warehouse/path',
'property-version'='1'
);
CREATE CATALOG my_catalog WITH (
'type'='iceberg',
'catalog-impl'='com.my.custom.CatalogImpl',
'my-additional-catalog-config'='my-value'
);
Why to unify Data Catalog
统一Catalog能够让不同引擎共享元数据,真正做到存算分离;
Flink、Iceberg和Trino都支持多种Catalog后端存储实现,且不是每种Catalog都能应用于生产环境,企业级应用需要确定一种类型;
Hive Metastore作为比较成熟的元数据管理系统,且Flink、Iceberg和Trino都兼容Hive Metastore;
Flink在不指定数据库和Catalog的情况下,默认使用MemoryCatalog,但该元数据不能持久化且不能被其他系统重用;
Flink的catalog跟Connector或数据源是可以是一对一、多对一和一对多的关系,而trino的catalog跟Connector或数据源是一对一的关系,这导致在权限控制上比较困难,但基于Catalog粒度的权限控制相对比较容易,因为HMS 3.x在存储上支持多Catalog;
实际应用中,基于Hive Metastore的统一元数据系统,还需要向外支持第三方元数据系统,比如不同版本的HMS、AWS Glue等;
基于HMS的元数据管理系统只关注技术元数据,然而实际应用中还可能关注一些业务元数据,这就要求元数据管理系统具备可扩展性。
How to unify Data Catalog
统一使用HMS作为元数据基础存储组件;
创建Catalog时统一调用HMS(3.x)接口持久化;
封装HMS不同版本、第三方存储库,对外提供统一操作接口;
Conclusion
Simply put, a data catalog is an organized inventory of data assets ,technically speaking,a data catalog is an operation set of metadata,including CRUDs. As the bridge between storage and computation ,data catalog play a pivotal role.In the seperate circumstance of storage and computation,Flink、Trino and Iceberg all support Hive Metastore as their metastore component invariably,although they also support many other metastores.Besides, upon HMS, Flink、Trino and Iceberg respectively implement their own data catalog,but for some critical reasons,such as access control in the level of tenants,metadata reuse between different engins and service capabilties and so on,it's necessarilly to unify every catalog, in our circumstances, HMS was used as the basic metastore and capsulized as a service component to support different metastores versions and kinds.
参考:
https://www.oracle.com/big-data/what-is-a-data-catalog/
https://cwiki.apache.org/confluence/display/Hive/Design#Design-MetadataObjects
以上是关于详述存算分离场景下的统一数据Catalog的主要内容,如果未能解决你的问题,请参考以下文章