Apache IoTDB源码解析(0.11.2版本):Session执行executeQueryStatement的源码解析

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache IoTDB源码解析(0.11.2版本):Session执行executeQueryStatement的源码解析相关的知识,希望对你有一定的参考价值。

当前版本:IoTDB 0.11.2

1. 声明

当前内容主要为记录下Session执行一次sql查询的过程的源码解析。
前面内容中有发现IoTDB本身就是使用Thrift来实现客户端和服务器,并解析得到Session就是Client的包装产物,所以这里的交互一般都是直接调用服务器的接口完成的!

2. 分析查询的Demo

下面给出一个简单的查询Demo,然后对其进行分析操作

		Session session = new Session("localhost",6667,"root","root");
		try 
			session.open();
			SessionDataSet sessionDataSet = session.executeQueryStatement("select value1 from root.test");
			sessionDataSet.setFetchSize(1); // 只一个一个的获取
			int fetchSize = sessionDataSet.getFetchSize();
			System.out.println(fetchSize);
			int nextNum = 0;
			while(sessionDataSet.hasNext()) 
				RowRecord next = sessionDataSet.next(); // 如果获取的数据量
				System.out.println(next.getTimestamp());
				nextNum++;
			
			System.out.println("nextNum="+nextNum);
			sessionDataSet.closeOperationHandle(); 
		 catch (IoTDBConnectionException e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		finally 
			try 
				session.close();
			 catch (IoTDBConnectionException e) 
				// TODO Auto-generated catch block
				e.printStackTrace();
			
		
		
	

这里采用了一个非常简单的查询执行

  1. 打开连接:open
  2. 执行sql :executeQueryStatement
  3. 得到结果:hasNext和next
  4. 关闭结果集:closeOperationHandle();
  5. 关闭连接:close

所以对于session的操作可以直接映射到服务端,服务端的具体实现是:TSServiceImpl这个实现类,实现了对Thrift的Server端的处理

1. 找到openSession方法(对应session.open)

该方法中使用了认证管理器对,给定的用户名和密码进行校验,对于登录成功的给与一个sessionId标记(默认从1开始)并缓存到服务器中,所以就会出现下面

也就是说,对于登录的用户给与一个sessionId,后面通过这个sessionId来判断该Client是否登录成功,所以open方式实际上就是登录并得到sessionId标记的方法

2. 找到executeQueryStatement方法(对应session.executeQueryStatement)


通过上面验证了,后续的发送请求中必须携带sessionId(并且还是登录的sessionId)才能发送请求到IoTDB的server端,发现这个其实就是对传递的sql语句进行解析为PhysicalPlan,后面通过执行器进行执行查询的操作(这里会返回查询结果)

并且从createQueryDataSet方法中可以知道会缓存QueryDataSet

3. sessionDataSet.hasNext()的源码分析

 public boolean hasNext() throws StatementExecutionException, IoTDBConnectionException 
   return ioTDBRpcDataSet.next();
 
 // ioTDBRpcDataSet.next()对应的方法
 public boolean next() throws StatementExecutionException, IoTDBConnectionException 
    if (hasCachedResults()) 
      constructOneRow();
      return true;
    
    if (emptyResultSet) 
      return false;
    
    if (fetchResults())  // 这个地方会请求server端的fetchResults方法
      constructOneRow();
      return true;
    
    return false;
  

所以hasNext方法就是为了确认数据是否存在,如果本地的数据迭代完毕后会尝试请求服务器判断是否还有数据!

4. 查看sessionDataSet.next();方法源码

public RowRecord next() throws StatementExecutionException, IoTDBConnectionException 
    if (!ioTDBRpcDataSet.hasCachedRecord && !hasNext()) 
      return null;
    
    ioTDBRpcDataSet.hasCachedRecord = false;

    return constructRowRecordFromValueArray();
  

其实这个地方还是会请求hasNext方法,并完成数据的构造为RowRecord实例

5. sessionDataSet.closeOperationHandle(); 方法的源码解析

public void closeOperationHandle() throws StatementExecutionException, IoTDBConnectionException 
    try 
      ioTDBRpcDataSet.close(); // 发现会调用close方法,但是实际上映射为TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
     catch (TException e) 
      throw new IoTDBConnectionException(e.getMessage());
    
  

所以该方法是直接请求server端的closeOperation方法

分析后发现该方法就是为了释放查询的QueryDataSet(这就是为什么要调用该方法的原因)

6. session.close方法对应closeSession方法

3. 总结

一个简单的Session查询操作过程中有:

  1. 打开session并认证用户名和密码,完成sessionid的缓存和回写
  2. 发送sql查询请求到server端,server端完成sql解析为计划,通过执行器执行查询计划并得到QueryDataSet,并缓存到内存中
  3. 迭代的过程中可能会涉及到向server端发送验证是否还有数据未传输完毕
  4. 关闭并释放当前sessionId的查询的资源
  5. 服务端移除sessionId,并完成该sessionId的查询资源释放

以上是关于Apache IoTDB源码解析(0.11.2版本):Session执行executeQueryStatement的源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Apache IoTDB源码解析(0.11.2版本):Session的源码解析

Apache IoTDB源码解析(0.11.2版本):RPC服务启动解析

Apache IoTDB源码解析(0.11.2版本):MultiFileLogNodeManager服务解析

Apache IoTDB源码解析(0.11.2版本):MultiFileLogNodeManager服务解析

Apache IoTDB源码解析(0.11.2版本):Session执行executeQueryStatement的源码解析

Apache IoTDB源码解析(0.11.2版本):Session执行executeQueryStatement的源码解析