如何扩展Hive Metastore Thrift RPC服务接口

Posted 咬定青松

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何扩展Hive Metastore Thrift RPC服务接口相关的知识,希望对你有一定的参考价值。

Hive 在经历十几年的发展,已经获得广泛应用,随着版本升级,不同版本之间的协议接口会发生一些变化,尽管HMS在尽量保持向前兼容,但在大版本变更时,仍然不能保证完全兼容,比如HMS2到HMS3,有关索引(Index)相关的接口变成了约束(constraint),比如下图中左边是HMS3接口定义,右边是HMS2接口定义:

这样当使用Hive 2的Client去访问HMS 3 的服务时候,就会报无效方法(Invalid Method)异常,此时可以通过一个Hive Metastore代理服务来解决。

在具体介绍Hive Metastore代理服务前,需要了解下Hive Metastore服务处理流程:

HMSHandler baseHandler = new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TProcessor processor = new ThriftHiveMetastore.Processor<>(handler));
final TThreadPoolServer.Args serverArgs = new TThreadPoolServer
    .Args(serverTransport)
    .transportFactory(new TTransportFactory())
    .protocolFactory(new TBinaryProtocol.Factory())
    .processor(processor);


TServer server = new TThreadPoolServer(serverArgs);
server.setServerEventHandler(getServerEventHandler());
server.serve();

上述示例代码启动了Hive Metastore的服务,其中两个重要的组件:

  • TThreadPoolServer:负责请求的监听和派遣

  • TProcessor:负责执行各RPC接口的实现逻辑

TThreadPoolServer功能介绍

TThreadPoolServer主要负责请求的监听和派遣,当接收到新的请求时,在内部将每个client的请求放入Worker线程池,然后继续下一个请求的监听,而工作线程负责一对一处理每个client的请求,直至处理完毕:

#TThreadPoolServer.class
public class TThreadPoolServer extends TServer 
    //private ExecutorService executorService_;
    
    public void serve() 
        ...
        while(!this.stopped_) 
            TTransport client = this.serverTransport_.accept();
            TThreadPoolServer.WorkerProcess wp = new TThreadPoolServer.WorkerProcess(client);
            this.executorService_.execute(wp);
        
        ...
        this.executorService_.shutdown();
    
    ...
    private class WorkerProcess implements Runnable 
        //private TTransport client_;
        public void run() 
            do 
                if (eventHandler != null) 
                    eventHandler.processContext(connectionContext, inputTransport, outputTransport);
                
              while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));
            if (this.client_.isOpen()) 
                this.client_.close();
            
         
     
  

TProcessor功能介绍

上面介绍了WorkerProcess的工作功能,TThreadPoolServer将每个客户端连接整体扔给TProcessor,没有介绍在其通过processor.process处理时候,如何定位每个不同的方法,实际上区分不同方法调用的逻辑同时也是TProcessor负责的,以Processor为例,定义了所支持的接口映射:

#ThriftHiveMetastore.class
public static class Processor<I extends ThriftHiveMetastore.Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements TProcessor 
    private static final Logger LOGGER = LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());


    public Processor(I iface) 
        super(iface, getProcessMap(new HashMap()));
    


    //Note:processMap
    protected Processor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processMap) 
        super(iface, getProcessMap(processMap));
    


    private static <I extends ThriftHiveMetastore.Iface> Map<String, ProcessFunction<I, ? extends TBase>> getProcessMap(Map<String, ProcessFunction<I, ? extends TBase>> processMap) 
        processMap.put("create_catalog", new ThriftHiveMetastore.Processor.create_catalog());
        processMap.put("alter_catalog", new ThriftHiveMetastore.Processor.alter_catalog());
        processMap.put("get_catalog", new ThriftHiveMetastore.Processor.get_catalog());
        processMap.put("get_catalogs", new ThriftHiveMetastore.Processor.get_catalogs());
        processMap.put("drop_catalog", new ThriftHiveMetastore.Processor.drop_catalog());
        processMap.put("create_database", new ThriftHiveMetastore.Processor.create_database());
        processMap.put("get_database", new ThriftHiveMetastore.Processor.get_database());
        processMap.put("drop_database", new ThriftHiveMetastore.Processor.drop_database());
        ...
    
 

也就是凡是登记在processMap的方法才能被识别,否则都会报异常,这个逻辑可以在Processor的父类TBaseProcessor中得知:

public abstract class TBaseProcessor<I> implements TProcessor 
    private final I iface;
    private final Map<String, ProcessFunction<I, ? extends TBase>> processMap;


    protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) 
        this.iface = iface;
        this.processMap = processFunctionMap;
    


    public Map<String, ProcessFunction<I, ? extends TBase>> getProcessMapView() 
        return Collections.unmodifiableMap(this.processMap);
    
    
    public boolean process(TProtocol in, TProtocol out) throws TException 
        TMessage msg = in.readMessageBegin();
        ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
        if (fn == null) 
            TProtocolUtil.skip(in, (byte)12);
            in.readMessageEnd();
            TApplicationException x = new TApplicationException(1, "Invalid method name: '" + msg.name + "'");
            out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid));
            x.write(out);
            out.writeMessageEnd();
            out.getTransport().flush();
            return true;
         else 
            fn.process(msg.seqid, in, out, this.iface);
            return true;
        

在process方法中,根据客户端调用传来的方法名称去查找processMap方法映射,如果找不到就报异常。如何添加非注册方法呢?观察Processor的构造方法:

#ThriftHiveMetastore.class
public static class Processor<I extends ThriftHiveMetastore.Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements TProcessor 
    private static final Logger LOGGER = LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());


    public Processor(I iface) 
        super(iface, getProcessMap(new HashMap()));
    


    //Note:processMap
    protected Processor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processMap) 
        super(iface, getProcessMap(processMap));
    
    ...
  

虽然可以传入processMap,但是该构造方法是protected,不允许直接创建对象 ,只能通过继承来扩展,比如这样:

public static class MetastoreProcessor<I extends ThriftHiveMetastore.Iface> extends ThriftHiveMetastore.Processor 


    private static final Logger LOGGER = LoggerFactory.getLogger(MetastoreProcess.class.getName());


    public MetastoreProcess(I iface) 
        super(iface, getProcessMap(new HashMap<String, ProcessFunction<I, ? extends TBase>>()));
    


    protected MetastoreProcess(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) 
        super(iface, getProcessMap(processMap));
    


    private static <I extends ThriftHiveMetastore.Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) 
        processMap.put("add_index", new add_index());
        processMap.put("alter_index", new alter_index());
        processMap.put("drop_index_by_name", new drop_index_by_name());
        processMap.put("get_index_by_name", new get_index_by_name());
        processMap.put("get_indexes", new get_indexes());
        processMap.put("get_index_names", new get_index_names());
        return processMap;
    

然后修改TProcessor实例,重建服务:

HMSHandler baseHandler = new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TProcessor processor = new MetastoreProcessor<>(handler));
final TThreadPoolServer.Args serverArgs = new TThreadPoolServer
    .Args(serverTransport)
    .transportFactory(new TTransportFactory())
    .protocolFactory(new TBinaryProtocol.Factory())
    .processor(processor);

以上是关于如何扩展Hive Metastore Thrift RPC服务接口的主要内容,如果未能解决你的问题,请参考以下文章

hive metastore异常 org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin,

hive开启metastore服务

Hive的metastore

Hive:为啥在我的项目文件夹中创建 metastore_db?

presto + Hive 安全配置

在 HIVE 上插入 Spark-SQL 插件