如何扩展Hive Metastore Thrift RPC服务接口
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何扩展Hive Metastore Thrift RPC服务接口相关的知识,希望对你有一定的参考价值。
Hive 在经历十几年的发展,已经获得广泛应用,随着版本升级,不同版本之间的协议接口会发生一些变化,尽管HMS在尽量保持向前兼容,但在大版本变更时,仍然不能保证完全兼容,比如HMS2到HMS3,有关索引(Index)相关的接口变成了约束(constraint),比如下图中左边是HMS3接口定义,右边是HMS2接口定义:
这样当使用Hive 2的Client去访问HMS 3 的服务时候,就会报无效方法(Invalid Method)异常,此时可以通过一个Hive Metastore代理服务来解决。
在具体介绍Hive Metastore代理服务前,需要了解下Hive Metastore服务处理流程:
HMSHandler baseHandler = new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TProcessor processor = new ThriftHiveMetastore.Processor<>(handler));
final TThreadPoolServer.Args serverArgs = new TThreadPoolServer
.Args(serverTransport)
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.processor(processor);
TServer server = new TThreadPoolServer(serverArgs);
server.setServerEventHandler(getServerEventHandler());
server.serve();
上述示例代码启动了Hive Metastore的服务,其中两个重要的组件:
TThreadPoolServer:负责请求的监听和派遣
TProcessor:负责执行各RPC接口的实现逻辑
TThreadPoolServer功能介绍
TThreadPoolServer主要负责请求的监听和派遣,当接收到新的请求时,在内部将每个client的请求放入Worker线程池,然后继续下一个请求的监听,而工作线程负责一对一处理每个client的请求,直至处理完毕:
#TThreadPoolServer.class
public class TThreadPoolServer extends TServer
//private ExecutorService executorService_;
public void serve()
...
while(!this.stopped_)
TTransport client = this.serverTransport_.accept();
TThreadPoolServer.WorkerProcess wp = new TThreadPoolServer.WorkerProcess(client);
this.executorService_.execute(wp);
...
this.executorService_.shutdown();
...
private class WorkerProcess implements Runnable
//private TTransport client_;
public void run()
do
if (eventHandler != null)
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));
if (this.client_.isOpen())
this.client_.close();
TProcessor功能介绍
上面介绍了WorkerProcess的工作功能,TThreadPoolServer将每个客户端连接整体扔给TProcessor,没有介绍在其通过processor.process处理时候,如何定位每个不同的方法,实际上区分不同方法调用的逻辑同时也是TProcessor负责的,以Processor为例,定义了所支持的接口映射:
#ThriftHiveMetastore.class
public static class Processor<I extends ThriftHiveMetastore.Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements TProcessor
private static final Logger LOGGER = LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());
public Processor(I iface)
super(iface, getProcessMap(new HashMap()));
//Note:processMap
protected Processor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processMap)
super(iface, getProcessMap(processMap));
private static <I extends ThriftHiveMetastore.Iface> Map<String, ProcessFunction<I, ? extends TBase>> getProcessMap(Map<String, ProcessFunction<I, ? extends TBase>> processMap)
processMap.put("create_catalog", new ThriftHiveMetastore.Processor.create_catalog());
processMap.put("alter_catalog", new ThriftHiveMetastore.Processor.alter_catalog());
processMap.put("get_catalog", new ThriftHiveMetastore.Processor.get_catalog());
processMap.put("get_catalogs", new ThriftHiveMetastore.Processor.get_catalogs());
processMap.put("drop_catalog", new ThriftHiveMetastore.Processor.drop_catalog());
processMap.put("create_database", new ThriftHiveMetastore.Processor.create_database());
processMap.put("get_database", new ThriftHiveMetastore.Processor.get_database());
processMap.put("drop_database", new ThriftHiveMetastore.Processor.drop_database());
...
也就是凡是登记在processMap的方法才能被识别,否则都会报异常,这个逻辑可以在Processor的父类TBaseProcessor中得知:
public abstract class TBaseProcessor<I> implements TProcessor
private final I iface;
private final Map<String, ProcessFunction<I, ? extends TBase>> processMap;
protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap)
this.iface = iface;
this.processMap = processFunctionMap;
public Map<String, ProcessFunction<I, ? extends TBase>> getProcessMapView()
return Collections.unmodifiableMap(this.processMap);
public boolean process(TProtocol in, TProtocol out) throws TException
TMessage msg = in.readMessageBegin();
ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
if (fn == null)
TProtocolUtil.skip(in, (byte)12);
in.readMessageEnd();
TApplicationException x = new TApplicationException(1, "Invalid method name: '" + msg.name + "'");
out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
else
fn.process(msg.seqid, in, out, this.iface);
return true;
在process方法中,根据客户端调用传来的方法名称去查找processMap方法映射,如果找不到就报异常。如何添加非注册方法呢?观察Processor的构造方法:
#ThriftHiveMetastore.class
public static class Processor<I extends ThriftHiveMetastore.Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements TProcessor
private static final Logger LOGGER = LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());
public Processor(I iface)
super(iface, getProcessMap(new HashMap()));
//Note:processMap
protected Processor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processMap)
super(iface, getProcessMap(processMap));
...
虽然可以传入processMap,但是该构造方法是protected,不允许直接创建对象 ,只能通过继承来扩展,比如这样:
public static class MetastoreProcessor<I extends ThriftHiveMetastore.Iface> extends ThriftHiveMetastore.Processor
private static final Logger LOGGER = LoggerFactory.getLogger(MetastoreProcess.class.getName());
public MetastoreProcess(I iface)
super(iface, getProcessMap(new HashMap<String, ProcessFunction<I, ? extends TBase>>()));
protected MetastoreProcess(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap)
super(iface, getProcessMap(processMap));
private static <I extends ThriftHiveMetastore.Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap)
processMap.put("add_index", new add_index());
processMap.put("alter_index", new alter_index());
processMap.put("drop_index_by_name", new drop_index_by_name());
processMap.put("get_index_by_name", new get_index_by_name());
processMap.put("get_indexes", new get_indexes());
processMap.put("get_index_names", new get_index_names());
return processMap;
然后修改TProcessor实例,重建服务:
HMSHandler baseHandler = new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TProcessor processor = new MetastoreProcessor<>(handler));
final TThreadPoolServer.Args serverArgs = new TThreadPoolServer
.Args(serverTransport)
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.processor(processor);
以上是关于如何扩展Hive Metastore Thrift RPC服务接口的主要内容,如果未能解决你的问题,请参考以下文章
hive metastore异常 org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin,