Apache IoTDB源码解析(0.11.2版本):Session的源码解析

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache IoTDB源码解析(0.11.2版本):Session的源码解析相关的知识,希望对你有一定的参考价值。

1. 声明

当前内容主要为解析Apache IoTDB 0.11.2版本的Session的源码解析
通过前面的Apache Thrift的Demo,可以发现iotdb中的server是使用了thrift的并生成了对应的iotdb-thrift.jar的

thrift在生成后必定有客户端Client和实现的服务端Processor的,thrift的通信就是使用thrift中的Client来访问的

iotdb中的session操作为:

Session session = new Session(xxx,xxx,xxx);
session.open();
session.executeSql();
session.close();

2. 查看Session中的代码

1. 查看open方法

  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
   transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs));

这个其实就是构建一个TTransport就是用来open用的

 if (!transport.isOpen()) 
      try 
        transport.open();
       catch (TTransportException e) 
        throw new IoTDBConnectionException(e);
      
    
    if (enableRPCCompression) 
      client = new TSIService.Client(new TCompactProtocol(transport));
     else 
      client = new TSIService.Client(new TBinaryProtocol(transport));
    

校验是否已经是开的,否则就打开连接,然后判断是否开启压缩(对应配置文件中的),构建对应的Client,TSIService就是thrift文件中定义并生成的

有了Client就可以实现与服务端进行通信了

 TSOpenSessionReq openReq = new TSOpenSessionReq();
    openReq.setUsername(username);
    openReq.setPassword(password);
    openReq.setZoneId(getTimeZone());
    try 
      TSOpenSessionResp openResp = client.openSession(openReq);
      RpcUtils.verifySuccess(openResp.getStatus());
      if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) 
        logger.warn("Protocol differ, Client version is , but Server version is ",
            protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue());
        if (openResp.getServerProtocolVersion().getValue() == 0) // less than 0.10
          throw new TException(String
              .format("Protocol not supported, Client version is %s, but Server version is %s",
                  protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue()));
        
      

通过Client携带用户名密码并发送openSession的一个请求,随后开始校验当前的状态是否正常(其实就是判断是否有错误,状态是否为SUCCESS),判断客户端和服务器的协议版本是否一致,低于0.10的就会报错

sessionId = openResp.getSessionId();
statementId = client.requestStatementId(sessionId);
 client = RpcUtils.newSynchronizedClient(client);

得到请求响应中的id并完成Client的statementId的设置(thrift文件定义发送数据查询请求必须要有statementId),最后使用同步方式锁住该client保证多线程情况下的请求

2. 查看close方法

 public synchronized void close() throws IoTDBConnectionException 
    if (isClosed) 
      return;
    
    TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
    try 
      client.closeSession(req);
     catch (TException e) 
      throw new IoTDBConnectionException(
          "Error occurs when closing session at server. Maybe server is down.", e);
     finally 
      isClosed = true;
      if (transport != null) 
        transport.close();
      
    
  

这里其实就是使用当前的sessionId并利用Client发送关闭Session的请求,最后关闭TTransport

结合综上,发现这个session实际上就是一个thrift版本的包装类,实现了同步?

3. 总结

1. 通过查看源码发现Session实际上就是thrift生成的Client的包装类,说明iotdb本身就是使用thrift的Server和Client来定义通信的

2. 本人通过查看发现iotdb的lib中有netty和jetty,将其全部移除掉(迁移到lib中)发现任然可以正常的使用iotdb证明了1的设想(也是可以发送写入数据的指令,jetty是用来访问metricService的、netty是用来构建MQTT的)


3. 了解iotdb必须从其中的组件进行了解才会有所体会

以上是关于Apache IoTDB源码解析(0.11.2版本):Session的源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Apache IoTDB源码解析(0.11.2版本):Session的源码解析

Apache IoTDB源码解析(0.11.2版本):RPC服务启动解析

Apache IoTDB源码解析(0.11.2版本):MultiFileLogNodeManager服务解析

Apache IoTDB源码解析(0.11.2版本):MultiFileLogNodeManager服务解析

Apache IoTDB源码解析(0.11.2版本):Session执行executeQueryStatement的源码解析

Apache IoTDB源码解析(0.11.2版本):Session执行executeQueryStatement的源码解析