Hive Metastore Server实现逻辑
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive Metastore Server实现逻辑相关的知识,希望对你有一定的参考价值。
人为什么爱美?“没有某种动力的消耗或变化,不可能让热从低温转移到高温”,这就是克劳修斯在1850年提出的热力学第二定律,该定律规定着宇宙中的一切事物或行为的演化方向:趋于混乱无序的状态,而且这一过程是不可逆转的。用术语来说就是熵增定律,在生活中,熵增的现象很多:在无外因干预的条件下,热水会慢慢变凉,整洁的房间看看变得脏乱,热烈的情感慢慢变得平淡,旺盛的生命也渐渐趋于衰竭。。。熵增定律的出现让我们看到了万物的结局,那说法是毁灭,不管是地球,宇宙还是人类这样的智慧生命,从宏观的角度来看,宇宙虽然浩瀚广阔,但是随着时间的推移,它也会不断变得混乱,最后走向热寂,这是宇宙的最终结局,可以说熵增定律是迄今为止最让人绝望的定律,因为没有任何外力能够永久的阻止它,只能延缓它。
既然如此,那是不是人类就应该放弃,躺平了呢?秋蝉鸣竭而死,那是它们的使命,昙花一现固然短暂,那也代表它们曾绽放过,近代的民族解放斗争,付出了惨痛代价,但依然有大批志士仁人前仆后继,那是为自由而战,甚至在现今抗疫斗争中无论什么人年龄多大,国家仍然没有放弃他们,尽管有时候看起来是违背“客观规律”,特别是西方深信不疑的物竞天择,我们仍然没有向西方国家那样让他们自生自灭,那是国家坚守自己最高的使命和理想。
宇宙无限,但终归沉寂,生命有限,也迟早有结束的那一天,但这不是躺平的理由,人类有动力将当下过好,欣赏美好的一切事物,同时也让自己成为美的一部分。希腊神话中的西西弗斯时刻重复着推着大石头到山顶,但永无成功的那天,但是那是他最高的虔诚:否认诸神,他爬上山顶所要进行的斗争本身就足以使一个人心里感到充实 。
本文首发微信公众号:码上观世界
本文梳理下Hive Metastore Server的实现逻辑,主要涵盖的内容有:
HiveMetastore启动逻辑
Metastore Client与Server端的通讯协议
Metastore Server的实现逻辑
注:本文基于Hive Metastore Server3.1.2版本。
HiveMetastore启动逻辑
HiveMetastore通过命令行启动,启动主要涉及两块内容:配置项加载和启动监听服务。分别对应下面代码中的
newMetastoreConf和startMetaStore方法。
#HiveMetastore.class
public static void main(String[] args) throws Throwable
final Configuration conf = MetastoreConf.newMetastoreConf();
HiveMetastoreCli cli = new HiveMetastoreCli(conf);
cli.parse(args);
Properties hiveconf = cli.addHiveconfToSystemProperties();
// set all properties specified on the command line
for (Map.Entry<Object, Object> item : hiveconf.entrySet())
conf.set((String) item.getKey(), (String) item.getValue())
Lock startLock = new ReentrantLock();
Condition startCondition = startLock.newCondition();
AtomicBoolean startedServing = new AtomicBoolean();
startMetaStore(cli.getPort(), HadoopThriftAuthBridge.getBridge(), conf, startLock,
startCondition, startedServing);
1. 配置项加载
public static Configuration newMetastoreConf()
Configuration conf = new Configuration();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null)
classLoader = MetastoreConf.class.getClassLoader();
// We don't add this to the resources because we don't want to read config values from it.
// But we do find it because we want to remember where it is for later in case anyone calls
// getHiveDefaultLocation().
hiveDefaultURL = classLoader.getResource("hive-default.xml");
// Add in hive-site.xml. We add this first so that it gets overridden by the new metastore
// specific files if they exist.
if(hiveSiteURL == null)
/*
* this 'if' is pretty lame - QTestUtil.QTestUtil() uses hiveSiteURL to load a specific
* hive-site.xml from data/conf/<subdir> so this makes it follow the same logic - otherwise
* HiveConf and MetastoreConf may load different hive-site.xml ( For example,
* HiveConf uses data/conf/spark/hive-site.xml and MetastoreConf data/conf/hive-site.xml)
*/
hiveSiteURL = findConfigFile(classLoader, "hive-site.xml");
if (hiveSiteURL != null)
conf.addResource(hiveSiteURL);
// Now add hivemetastore-site.xml. Again we add this before our own config files so that the
// newer overrides the older.
hiveMetastoreSiteURL = findConfigFile(classLoader, "hivemetastore-site.xml");
if (hiveMetastoreSiteURL != null)
conf.addResource(hiveMetastoreSiteURL);
// Add in our conf file
metastoreSiteURL = findConfigFile(classLoader, "metastore-site.xml");
if (metastoreSiteURL != null)
conf.addResource(metastoreSiteURL);
// If a system property that matches one of our conf value names is set then use the value
// it's set to to set our own conf value.
for (ConfVars var : ConfVars.values())
if (System.getProperty(var.varname) != null)
LOG.debug("Setting conf value " + var.varname + " using value " +
System.getProperty(var.varname));
conf.set(var.varname, System.getProperty(var.varname));
// Pick up any system properties that start with "hive." and set them in our config. This
// way we can properly pull any Hive values from the environment without needing to know all
// of the Hive config values.
System.getProperties().stringPropertyNames().stream()
.filter(s -> s.startsWith("hive."))
.forEach(s ->
String v = System.getProperty(s);
LOG.debug("Picking up system property " + s + " with value " + v);
conf.set(s, v);
);
// If we are going to validate the schema, make sure we don't create it
if (getBoolVar(conf, ConfVars.SCHEMA_VERIFICATION))
setBoolVar(conf, ConfVars.AUTO_CREATE_ALL, false);
if (!beenDumped.getAndSet(true) && getBoolVar(conf, ConfVars.DUMP_CONFIG_ON_CREATION) &&
LOG.isDebugEnabled())
LOG.debug(dumpConfig(conf));
return conf;
从代码可知:配置项主要来源于以下4个配置文件:
hive-default.xml
hive-site.xml
hivemetastore-site.xml
metastore-site.xml
另外,配置项也可以在启动时通过环境变量传递。
2. 启动监听服务
#HiveMetastore.class
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
Configuration conf, Lock startLock, Condition startCondition,
AtomicBoolean startedServing)
HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf,
false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TServerSocket serverSocket = SecurityUtils.getServerSocket(null, port);
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
.processor(new TSetIpAddressProcessor<>(handler))
.transportFactory(transFactory)
.protocolFactory(protocolFactory)
.inputProtocolFactory(inputProtoFactory)
.minWorkerThreads(minWorkerThreads)
.maxWorkerThreads(maxWorkerThreads);
TServer tServer = new TThreadPoolServer(args);
tServer.serve();
这里主要启动了一个Thrift Server,服务端的处理逻辑主要在HMSHandler中,HMSHandler 实现了IFace接口,主要提供对HIve Metastore元数据的存储访问功能。HMSHandler作为Processor实例的成员,Processor负责处理跟客户端请求的交互,根据实际请求来调用HMSHandler。下面详细介绍HMSHandler和Processor。
Metastore Client与Server端的通讯协议
1. HMSHandler
HMSHandler 实现了IFace接口,从其接口定义可见一斑:
Iface的具体实现是HMSHandler:
public static class HMSHandler extends FacebookBase implements IHMSHandler
public interface IHMSHandler extends ThriftHiveMetastore.Iface, Configurable
2.Processor
Processor实例的抽象接口只有一个方法,用于处理请求,它的两个参数都是TProtocol,分别表示请求输入和响应输出。
#ThriftHiveMetastore.class
public interface TProcessor
boolean process(TProtocol var1, TProtocol var2) throws TException;
public abstract class TBaseProcessor<I> implements TProcessor
private final I iface;
private final Map<String, ProcessFunction<I, ? extends TBase>> processMap;
protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap)
this.iface = iface;
this.processMap = processFunctionMap;
public Map<String, ProcessFunction<I, ? extends TBase>> getProcessMapView()
return Collections.unmodifiableMap(this.processMap);
public boolean process(TProtocol in, TProtocol out) throws TException
TMessage msg = in.readMessageBegin();
ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
if (fn == null)
TProtocolUtil.skip(in, (byte)12);
in.readMessageEnd();
TApplicationException x = new TApplicationException(1, "Invalid method name: '" + msg.name + "'");
out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
else
fn.process(msg.seqid, in, out, this.iface);
return true;
processMap维护着所有方法名称和ProcessFunction的映射关系,该映射关系通过Processor实例子类来初始化:
#ThriftHiveMetastore.class
public static class Processor<I extends ThriftHiveMetastore.Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements TProcessor
private static final Logger LOGGER = LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());
public Processor(I iface)
super(iface, getProcessMap(new HashMap()));
protected Processor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processMap)
super(iface, getProcessMap(processMap));
private static <I extends ThriftHiveMetastore.Iface> Map<String, ProcessFunction<I, ? extends TBase>> getProcessMap(Map<String, ProcessFunction<I, ? extends TBase>> processMap)
processMap.put("getMetaConf", new ThriftHiveMetastore.Processor.getMetaConf());
processMap.put("setMetaConf", new ThriftHiveMetastore.Processor.setMetaConf());
processMap.put("create_catalog", new ThriftHiveMetastore.Processor.create_catalog());
processMap.put("alter_catalog", new ThriftHiveMetastore.Processor.alter_catalog());
processMap.put("get_catalog", new ThriftHiveMetastore.Processor.get_catalog());
processMap.put("get_catalogs", new ThriftHiveMetastore.Processor.get_catalogs());
processMap.put("drop_catalog", new ThriftHiveMetastore.Processor.drop_catalog());
processMap.put("create_database", new ThriftHiveMetastore.Processor.create_database());
processMap.put("get_database", new ThriftHiveMetastore.Processor.get_database());
processMap.put("drop_database", new ThriftHiveMetastore.Processor.drop_database());
。。。
其中每个具体ProcessFunction实例都继承自ProcessFunction:
public abstract class ProcessFunction<I, T extends TBase>
private final String methodName;
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessFunction.class.getName());
public ProcessFunction(String methodName)
this.methodName = methodName;
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException
TBase args = this.getEmptyArgsInstance();
try
args.read(iprot);
catch (TProtocolException var10)
iprot.readMessageEnd();
TApplicationException x = new TApplicationException(7, var10.getMessage());
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)3, seqid));
x.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
return;
iprot.readMessageEnd();
TBase result = null;
try
result = this.getResult(iface, args);
catch (TException var9)
LOGGER.error("Internal error processing " + this.getMethodName(), var9);
TApplicationException x = new TApplicationException(6, "Internal error processing " + this.getMethodName());
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)3, seqid));
x.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
return;
if (!this.isOneway())
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)2, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
protected abstract boolean isOneway();
public abstract TBase getResult(I var1, T var2) throws TException;
public abstract T getEmptyArgsInstance();
public String getMethodName()
return this.methodName;
每个ProcessFunction的处理逻辑都在getResult方法中实现,举几个例子:
#TCLIService.class
public static class GetCatalogs<I extends TCLIService.Iface> extends ProcessFunction<I, TCLIService.GetCatalogs_args>
public GetCatalogs()
super("GetCatalogs");
public TCLIService.GetCatalogs_args getEmptyArgsInstance()
return new TCLIService.GetCatalogs_args();
protected boolean isOneway()
return false;
public TCLIService.GetCatalogs_result getResult(I iface, TCLIService.GetCatalogs_args args) throws TException
TCLIService.GetCatalogs_result result = new TCLIService.GetCatalogs_result();
result.success = iface.GetCatalogs(args.req);
return result;
public static class GetTables<I extends TCLIService.Iface> extends ProcessFunction<I, TCLIService.GetTables_args>
public GetTables()
super("GetTables");
public TCLIService.GetTables_args getEmptyArgsInstance()
return new TCLIService.GetTables_args();
protected boolean isOneway()
return false;
public TCLIService.GetTables_result getResult(I iface, TCLIService.GetTables_args args) throws TException
TCLIService.GetTables_result result = new TCLIService.GetTables_result();
result.success = iface.GetTables(args.req);
return result;
public static class GetSchemas<I extends TCLIService.Iface> extends ProcessFunction<I, TCLIService.GetSchemas_args>
public GetSchemas()
super("GetSchemas");
public TCLIService.GetSchemas_args getEmptyArgsInstance()
return new TCLIService.GetSchemas_args();
protected boolean isOneway()
return false;
public TCLIService.GetSchemas_result getResult(I iface, TCLIService.GetSchemas_args args) throws TException
TCLIService.GetSchemas_result result = new TCLIService.GetSchemas_result();
result.success = iface.GetSchemas(args.req);
return result;
由此可知,Client与Server间的通讯是通过RPC实现的,客户端主要传递需要执行的方法名和协议(如输入参数,输入格式等),服务端执行完毕之后,输出响应。
Metastore Server的实现逻辑
由前文介绍可知,每个具体的ProcessFunction实例都是通过iface执行实际的逻辑,这里的iface就是前文中初始化的HMSHandler。
这里同样以一些常用操作,来看HMSHandler的具体实现:
1. create_catalog
create_catalog方法实现了创建Catalog的实现,创建Catalog的操作从HMS 3.x开始支持。create_catalog主要逻辑:
根据Catalog指定的路径创建存储目录;
元数据库中持久化Catalog信息;
创建默认数据库;
public void create_catalog(CreateCatalogRequest rqst)
Path catPath = new Path(catalog.getLocationUri());
wh.mkdirs(catPath);
ms.createCatalog(catalog);
// Create a default database inside the catalog
Database db = new Database(DEFAULT_DATABASE_NAME, "Default database for catalog " +
catalog.getName(), catalog.getLocationUri(), Collections.emptyMap());
db.setCatalogName(catalog.getName());
create_database_core(ms, db);
@Override
public GetCatalogResponse get_catalog(GetCatalogRequest rqst)
Catalog cat = getMS().getCatalog(catName);
return new GetCatalogResponse(cat);
2. create_database
create_database实现了数据库的创建逻辑:
根据Catalog的路径确定数据库的数据存储路径;
创建数据存储目录;
持久化数据库元信息;
@Override
public void create_database(final Database db)
Catalog cat = getMS().getCatalog(db.getCatalogName());
Path dbPath = wh.determineDatabasePath(cat, db);
db.setLocationUri(dbPath.toString());
wh.mkdirs(dbPath);
ms.createDatabase(db);
@Override
public Database get_database(final String name)
String[] parsedDbName = parseDbName(name, conf);
Database db=getMS().getDatabase(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
return db;
3. create_table
create_table实现了表创建的逻辑:
创建表的数据存储目录;
持久化表元数据;
@Override
public void create_table(final Table tbl)
if (!tbl.isSetCatName())
tbl.setCatName(getDefaultCatalog(conf));
db = ms.getDatabase(tbl.getCatName(), tbl.getDbName());
tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()))
tbl.getSd().setLocation(tblPath.toString());
wh.mkdirs(tblPath);
// set create time
long time = System.currentTimeMillis() / 1000;
tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
ms.createTable(tbl);
@Override
public GetTableResult get_table_req(GetTableRequest req)
String catName = req.isSetCatName() ? req.getCatName() : getDefaultCatalog(conf);
Table t=getMS().getTable(catName, req.getDbName(), req.getTblName();
return t;
4.drop_table
drop_table实现了表删除的逻辑:
drop表约束信息;
drop表元数据;
@Override
public void drop_table(final String dbname, final String name, final boolean deleteData)
db = ms.getDatabase(catName, dbname);
// drop any partitions
tbl = get_table_core(catName, dbname, name);
// Drop the partitions and data if enabled
//dropPartitions
// Drop any constraints on the table
ms.dropConstraint(catName, dbname, name, null, true);
ms.dropTable(catName, dbname, name)
5. drop_database
drop_database实现了删除数据库的逻辑:
检查确保数据库下面没有表或者函数以及表数据所在目录可写;
删除函数、视图、表和分区元数据以及表或者分区数据目录;
删除数据库元数据;
删除数据库数据目录;
@Override
public void drop_database(final String dbName, final boolean deleteData, final boolean cascade)
db = ms.getDatabase(catName, name);
Set<String> uniqueTableNames = new HashSet<>(get_all_tables(dbName)));
List<String> allFunctions = get_functions(dbName, "*");
if (!cascade)
if (!uniqueTableNames.isEmpty())
throw new InvalidOperationException(
"Database " + db.getName() + " is not empty. One or more tables exist.");
if (!allFunctions.isEmpty())
throw new InvalidOperationException(
"Database " + db.getName() + " is not empty. One or more functions exist.");
Path path = new Path(db.getLocationUri()).getParent();
if (!wh.isWritable(path))
throw new MetaException("Database not dropped since " +
path + " is not writable by " +
SecurityUtils.getUser());
Path databasePath = wh.getDnsPath(wh.getDatabasePath(db));
// drop any functions before dropping db
for (String funcName : allFunctions)
drop_function(catPrependedName, funcName);
for (Table materializedView : materializedViews)
// Drop the materialized view but not its data
drop_table(name, materializedViews, false);
// drop tables before dropping db
List<String> allTables = new ArrayList<>(uniqueTableNames);
for (String table : allTables)
//drop_table and partitions and their data if enabled
//dropPartitions
ms.dropDatabase(catName, name);
// Delete the data in the database
wh.deleteDir(new Path(db.getLocationUri()), true, db);
6. drop_catalog
drop_catalog实现了删除Catalog的逻辑:
检查确保Catalog下面只有一个数据库,且是默认数据库;
执行删除数据库操作(可能会失败);
如果成功删除数据库,则删除Catalog元数据和数据目录;
@Override
public void drop_catalog(DropCatalogRequest rqst)
Catalog cat = ms.getCatalog(catName);
RawStore ms = getMS();
List<String> allDbs = get_databases(prependNotNullCatToDbName(catName, null));
if (allDbs != null && !allDbs.isEmpty())
// It might just be the default, in which case we can drop that one if it's empty
if (allDbs.size() == 1 && allDbs.get(0).equals(DEFAULT_DATABASE_NAME))
if(drop_database_core(ms, catName, DEFAULT_DATABASE_NAME, true, false))
ms.dropCatalog(catName) ;
wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false, false, false);
总结
本文从源码角度梳理了HMS的实现逻辑,主要涉及到HiveMetastore启动逻辑、Metastore Client与Server端的通讯协议以及Metastore Server的实现逻辑三大板块,其中对Metastore Server的实现逻辑中的存储底层元数据的操作没有介绍,这里主要涉及到对底层存储支持的增删改查,由于HMS通过对象存储访问元数据库,使用起来不太符合互联网从业者的开发习惯,而且性能也得不到保证,目前有一些大厂都开始将其替换成Mybatis,因此介绍它的实际意义不大。另外,在HiveMetastore启动逻辑中涉及到Thrift的多种服务实现等重要内容,限于篇幅,本文将在后续文章中再做介绍。
以上是关于Hive Metastore Server实现逻辑的主要内容,如果未能解决你的问题,请参考以下文章
supervisor管理hive服务(metastore,hiveserver2),使自启
大数据(Hive的MetaStore切换及其Hive的语法细节)
derby 中的 Hive Metastore 与 Hive/Warehouse 中的 Hive Metastore 有啥区别?