zookeeper源码解析--请求处理--PrepRequestProcessor

Posted raindayinrain

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper源码解析--请求处理--PrepRequestProcessor相关的知识,希望对你有一定的参考价值。

PrepRequestProcessor

1.从待处理请求队列取出一个请求
2.如果是requestOfDeath
	退出
3.置空请求对象的事务头,事务体。
4.请求未被过滤下,执行pRequestHelper
5.将请求交给下一级处理

pRequestHelper

对写请求设置
对请求进行合法性检查
按请求内容预先更新节点状态信息
为请求对象设置事务头,事务体,事务摘要

对请求处理中遭遇错误的,采用抛出异常方式
对错误的写请求,其事务头,事务体包含错误及错误信息,
对错误的请求,请求本身包含抛出异常信息。

如果请求类型为OpCode.createContainer/OpCode.create/OpCode.create2
动态创建CreateRequest对象
//基于原始请求对象,
//对请求进行合法性检查,
//预先对按请求进行节点更新&数据访问,
//设置好请求对象的事务头,事务体,事务摘要
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
如果请求类型为OpCode.createTTL
	CreateTTLRequest createTtlRequest = new CreateTTLRequest();
    pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
如果请求类型为OpCode.deleteContainer/OpCode.delete
    DeleteRequest deleteRequest = new DeleteRequest();
    pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
如果请求类型为OpCode.setData
    SetDataRequest setDataRequest = new SetDataRequest();
    pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
如果请求类型为OpCode.reconfig
    ReconfigRequest reconfigRequest = new ReconfigRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
    pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
如果请求类型为OpCode.setACL
    SetACLRequest setAclRequest = new SetACLRequest();
    pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
如果请求类型为OpCode.check
    CheckVersionRequest checkRequest = new CheckVersionRequest();
    pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
如果请求类型为OpCode.multi
    // 动态构造多操作请求,请求实体
    MultiOperationRecord multiRequest = new MultiOperationRecord();
    // 反向序列化得到数据实体
    try 
    
              ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
     
    catch (IOException e) 
    
          	// 抛出异常时,仍然设置事务头
              request.setHdr(
              	new TxnHeader(
            			request.sessionId, request.cxid, 
            			zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
              // 再次抛出
              throw e;
    
          
    // 事务容器
    List<Txn> txns = new ArrayList<Txn>();
    // 获取下一zxid
    long zxid = zks.getNextZxid();
    KeeperException ke = null;
    // 获取每个子操作&最新状态信息,子操作父节点&最新状态信息
    Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
    // 设置请求的事务头
    request.setHdr(
          	new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
    // 对每个子操作逐个处理
    for (Op op : multiRequest) 
    
          	// 子操作的,请求实体
              Record subrequest = op.toRequestRecord();
              int type;
              Record txn;
              // 表示之前某个子操作遭遇异常
              if (ke != null) 
              
              	 // 设置类别
                  type = OpCode.error;
                  // 设置事务体
                  txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
               
              else 
              
                  try 
                  
                  	//基于原始请求对象,
					//对请求进行合法性检查,
					//预先对按请求进行节点更新&数据访问,
					//设置好请求对象的事务头,事务体,事务摘要
                    pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                    // 子操作类别
                    type = op.getType();
                    // 获得事务体
                    txn = request.getTxn();
                   
                  // 如果子操作在于异常
                  catch (KeeperException e) 
                  
                  	// 异常记录
                    ke = e;
                    // 类别记录
                    type = OpCode.error;
                    // 用于记录失败的事务体
                    txn = new ErrorTxn(e.code().intValue());
                    if (e.code().intValue() > Code.APIERROR.intValue()) 
                    
                          LOG.info(
                          	"Got user-level KeeperException when processing  aborting" 
                          	+ " remaining multi ops. Error Path: Error:", 
                          	+ request.toString(), e.getPath(), e.getMessage());
                    
			
			 		// 将异常信息设置到请求对象
                    request.setException(e);
                    // 一旦一个子操作遭遇异常,执行回滚
                    // 回滚需要保证复合操作每个子操作对ChangeRecord的影响撤销。
                    // 把ChangeRecord恢复到执行复合操作前
                    rollbackPendingChanges(zxid, pendingChanges);
                  
              

              try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) 
              
                  BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                  // 事务体对象序列化
                  txn.serialize(boa, "request");
                  ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
                  // 存储到Txn对象集合
                  txns.add(new Txn(type, bb.array()));
              
          

 		  // 复合请求的事务体,也包含多个Txn分别对应每个子操作
          request.setTxn(new MultiTxn(txns));
          if (digestEnabled) 
          
          	 // 设置事务摘要 
              setTxnDigest(request);
          
 	
如果请求类型为OpCode.createSession/OpCode.closeSession
   	// 本地会话不需追踪?
    if (!request.isLocalSession()) 
    
              pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
    
如果请求类型为OpCode.sync/OpCode.exists/OpCode.getData/OpCode.getACL
    OpCode.getChildren/OpCode.getAllChildrenNumber/OpCode.getChildren2/OpCode.ping
    OpCode.setWatches/OpCode.setWatches2/OpCode.checkWatches/OpCode.removeWatches
    OpCode.getEphemerals/OpCode.multiRead/OpCode.addWatch/OpCode.addWatch
  	// 会话检查
  	// 只读请求不需要
  	zks.sessionTracker.checkSession(request.sessionId, request.getOwner());				

pRequest2TxnCreate(type, request, record, deserialize);

// 做新建节点前的合法性,权限,配额检查
// 一切合法下,更新父节点,新建节点状态信息。
// 设置好本请求对象的事务头,事务体,事务摘要信息

if (deserialize) 

	// 从字节流得到数据对象
	ByteBufferInputStream.byteBuffer2Record(request.request, record);


// 从数据对象提取信息
if (type == OpCode.createTTL) 

    CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
    flags = createTtlRequest.getFlags();
    path = createTtlRequest.getPath();
    acl = createTtlRequest.getAcl();
    data = createTtlRequest.getData();
    ttl = createTtlRequest.getTtl();
 
else 

    CreateRequest createRequest = (CreateRequest) record;
    flags = createRequest.getFlags();
    path = createRequest.getPath();
    acl = createRequest.getAcl();
    data = createRequest.getData();
    ttl = -1;


CreateMode createMode = CreateMode.fromFlag(flags);
// 合法性验证,不合法通过抛出特定类型包含特定信息异常来层层回退
validateCreateRequest(path, createMode, request, ttl);
// 父路径
String parentPath = validatePathForCreate(path, request.sessionId);
// 权限
List<ACL> listACL = fixupACL(path, request.authInfo, acl);
// 父亲路径最新状态
ChangeRecord parentRecord = getRecordForPath(parentPath);
// 权限检查
zks.checkACL(
	request.cnxn, parentRecord.acl, 
	ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
// 父亲路径下遭遇改变次数
int parentCVersion = parentRecord.stat.getCversion();
// 新节点完整路径
if (createMode.isSequential()) 

    path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);


// 验证路径合法性
validatePath(path, request.sessionId);
try 

	// 如果要创建的路径已经存在了
    if (getRecordForPath(path) != null) 
    
    	// 抛出特定类型,包含特定信息的异常
        throw new KeeperException.NodeExistsException(path);
    
 
catch (KeeperException.NoNodeException e) 


   
boolean ephemeralParent = EphemeralType.get(
	parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
// 这类父路径不可包含子节点 
if (ephemeralParent) 

    throw new KeeperException.NoChildrenForEphemeralsException(path);

   
// 更新父节点遭遇变化次数
int newCversion = parentRecord.stat.getCversion() + 1;
// 检查配额限制,配额限制超出时,通过抛出特定类型,包含特定信息的异常来层层退出
zks.checkQuota(path, null, data, OpCode.create);
// 设置事务体
if (type == OpCode.createContainer) 

    request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
 
else if (type == OpCode.createTTL) 

    request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
 
else 

    request.setTxn(
    	new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));


TxnHeader hdr = request.getHdr();
long ephemeralOwner = 0;
// 创建模式--创建容器节点
if (createMode.isContainer()) 

      ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
 
// 创建模式--创建自带生命期节点
else if (createMode.isTTL()) 

      ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
 
// 创建模式--创建临时节点【会话关闭,会话所属临时节点自动销毁】
else if (createMode.isEphemeral()) 

      ephemeralOwner = request.sessionId;


// 创建节点状态
StatPersisted s 
	= DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
// 基于原有父节点的parentRecord得到新的
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
// 记录了此节点下孩子累计新增孩子数量-变化为节点孩子变化
parentRecord.stat.setCversion(newCversion);
// 记录了最后修改此节点状态的事务id
parentRecord.stat.setPzxid(request.getHdr().getZxid());
// 更新整体和节点自身摘要信息
parentRecord.precalculatedDigest 
	= precalculateDigest(DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
// 一个节点可能包含多个关联项
zks.outstandingChanges.add(c);
// 一个节点,只有一个路径,也就只记录一个关联项,反映此节点最新状态
zks.outstandingChangesForPath.put(c.path, c);
// 反映新建节点
ChangeRecord nodeRecord 
	= new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
nodeRecord.data = data;
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
setTxnDigest(request, nodeRecord.precalculatedDigest);
request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), preCalculatedDigest.treeDigest));
addChangeRecord(nodeRecord);

pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);

基于原始请求对象,
对请求进行合法性检查,
预先对按请求进行节点更新&数据访问,
设置好请求对象的事务头,事务体,事务摘要

如果请求对象事务头为null
	request.setHdr(
		new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
	如果类别为OpCode.create/OpCode.create2/OpCode.createTTL/OpCode.createContainer
		// 做新建节点前的合法性,权限,配额检查
		// 一切合法下,更新父节点,新建节点状态信息。
		// 设置好本请求对象的事务头,事务体,事务摘要信息
		pRequest2TxnCreate(type, request, record, deserialize);
	如果类别为OpCode.deleteContainer
		String path = new String(request.request.array(), UTF_8);
        // 获取父路径,及验证路径
        String parentPath = getParentPathAndValidate(path);
        // 获取删除路径最新信息
        ChangeRecord nodeRecord = getRecordForPath(path);
        // 如果删除节点包含子节点,不可删除
        if (nodeRecord.childCount > 0) 
        
            throw new KeeperException.NotEmptyException(path);
        
        
        // 要删除节点类别非法
        if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) 
        	== EphemeralType.NORMAL) 
        
            throw new KeeperException.BadVersionException(path);
        
        
        // 获取父节点最新状态
        ChangeRecord parentRecord = getRecordForPath(parentPath);
        // 设置改请求的事务体
        request.setTxn(new DeleteTxn(path));
        // 基于parentRecord得到新的
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        parentRecord.childCount--;
        parentRecord.stat.setPzxid(request.getHdr().getZxid());
        parentRecord.precalculatedDigest 
        	= precalculateDigest(DigestOpCode.UPDATE, 
        		parentPath, parentRecord.data, parentRecord.stat);
        // 完成对父节点最新状态的更新
        addChangeRecord(parentRecord);
        // 对删除节点,设置改节点ChangeRecord
        // 通过改ChangeRecord可辨别节点已被删除
        nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
        // 更新摘要信息
        nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
        // 设置事务摘要
        setTxnDigest(request, nodeRecord.precalculatedDigest);
        // 完成对删除节点的状态更新
        addChangeRecord(nodeRecord);
   	如果类别为OpCode.delete
   		// 会话检查
   		zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        DeleteRequest deleteRequest = (DeleteRequest) record;
        // 反向序列化得到数据实体
        if (deserialize) 
        
            ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
        
        
        // 要删除路径
        String path = deleteRequest.getPath();
        // 要删除路径父路径
        String parentPath = getParentPathAndValidate(path);
        // 父路径最新状态
        ChangeRecord parentRecord = getRecordForPath(parentPath);
        // 权限检查
        zks.checkACL(
        	request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
        // 要删除路径自身
        // 路径不存在时,通过设置异常类型,异常信息,层层退出
        ChangeRecord nodeRecord = getRecordForPath(path);
        // 版本检查
        checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
        // 有孩子的节点不可被删除
        if (nodeRecord.childCount > 0) 
        
            throw new KeeperException.NotEmptyException(path);
        
        // 设置事务体
        request.setTxn(new DeleteTxn(path));
        // 基于parentRecord得到新的
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        // 更新父节点最新状态信息
        // 状态--孩子数量
        parentRecord.childCount--;
        // 状态--最后对其修改的事务的id
        parentRecord.stat.setPzxid(request.getHdr().getZxid());
        // 更新摘要
        parentRecord.precalculatedDigest 
        	= precalculateDigest(DigestOpCode.UPDATE, 
        	parentPath, parentRecord.data, parentRecord.stat);
        addChangeRecord(parentRecord);
        // 为要删除节点,动态构造ChangeRecord
        nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
        // 更新摘要
        nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
        // 设置事务摘要
        setTxnDigest(request, nodeRecord.precalculatedDigest);
        addChangeRecord(nodeRecord);
 	如果类别为OpCode.setData
 		// 检查会话
 		zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        // 事务体
        SetDataRequest setDataRequest = (SetDataRequest) record;
        // 反向序列化得到数据对象
        if (deserialize) 
        
            ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
        
        // 路径
        path = setDataRequest.getPath();
        // 验证路径合法性
        validatePath(path, request.sessionId);
        // 获取此路径最新信息
        nodeRecord = getRecordForPath(path);
        // 权限检查
        zks.checkACL(
        	request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
        // 配额检查
        zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
        // 版本检查
        int newVersion = 
        	checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
        // 设置事务体
        request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
        // 基于nodeRecode得到新的
        nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
        // 新的相比旧的,版本变了
        nodeRecord.stat.setVersion(newVersion);
        // 修改时间变了
        nodeRecord.stat.setMtime(request.getHdr().getTime());
        // 最后修改的事务id变了
        nodeRecord.stat.setMzxid(zxid);
        // 节点数据内容变了
        nodeRecord.data = setDataRequest.getData();
        // 更新摘要
        nodeRecord.precalculatedDigest 
        	= precalculateDigest(DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
        // 设置事务摘要
        setTxnDigest(request, nodeRecord.precalculatedDigest);
        addChangeRecord(nodeRecord);
    如果类别为OpCode.setACL
    	// 检查会话
    	zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        // 事务体
        SetACLRequest setAclRequest = (SetACLRequest) record;
        // 反向序列化得到数据对象
        if (deserialize) 
        
            ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
        
        // 路径
        path = setAclRequest.getPath();
        // 验证路径
        validatePath(path, request.sessionId);
        // 权限列表
        List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
        // 路径最新状态
        nodeRecord = getRecordForPath(path);
        // 权限检查
        zks.checkACL(
        	request.cnxn, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo, path, listACL);
        // 版本检查
        newVersion 
        	= checkAndIncVersion(nodeRecord.stat.getAversion(), setAclRequest.getVersion(), path);
        // 设置事务体
        request.setTxn(new SetACLTxn(path, listACL, newVersion));
        // 基于nodeRecord得到新的
        nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
        // 设置新的版本
        nodeRecord.stat.setAversion(newVersion);
        // 更新摘要信息
        nodeRecord.precalculatedDigest 
        	= precalculateDigest(DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
        // 设置事务摘要
        setTxnDigest(request, nodeRecord.precalculatedDigest);
        addChangeRecord(nodeRecord);
   	如果类型为OpCode.createSession
   		request.request.rewind();
   		// 获取超时时间?
        int to = request.request.getInt();
        // 设置事务体
        request.setTxn(new CreateSessionTxn(to));
        request.request.rewind();
        // 会话追踪
        zks.sessionTracker.trackSession(request.sessionId, to);
        // 会话拥有者,为请求拥有者
        zks.setOwner(request.sessionId, request.getOwner());
    如果类型为OpCode.closeSession
    	// 当前时间
    	long startTime = Time.currentElapsedTime();
        synchronized (zks.outstandingChanges) 
        
        	// 获取属于此会话的所有临时节点
            Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
            // 对所有排队中ChangeRecord
            for (ChangeRecord c : zks.outstandingChanges) 
            
            	// 表示待实施请求中,已经包含删除某路径请求
                if (c.stat == null) 
                
                	// 将此会话下已经规划为删除的路径从这里的临时节点中去掉,避免重复删除
                    es.remove(c.path);
                 
                // 表示待实施请求中,包含某个路径属于此临时会话
                else if (c.stat.getEphemeralOwner() == request.sessionId) 
                
                	// 将改路径加入这里的临时节点。放置漏掉属于此会话的待实施的临时节点。
                    es.add(c.path);
                
            
			
			// 对此会话的每个临时节点,对其父节点进行更新
            for (String path2Delete : es) 
            
                if (digestEnabled) 
                
                	// 获取删除节点,父节点&验证
                    parentPath = getParentPathAndValidate(path2Delete);
                    // 获取父节点最新状态
                    parentRecord = getRecordForPath(parentPath);
                    // 基于parentRecord得到新的
                    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
                    // 更新修改父节点孩子的事务id
                    parentRecord.stat.setPzxid(request.getHdr().getZxid());
                    // 更新摘要
                    parentRecord.precalculatedDigest 
                    	= precalculateDigest(DigestOpCode.UPDATE, 
                    		parentPath, parentRecord.data, parentRecord.stat);
                    addChangeRecord(parentRecord);
                
                
                // 动态构造要删除节点的nodeRecord
                nodeRecord = 
                	new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
                // 更新摘要
                nodeRecord.precalculatedDigest 
                	= precalculateDigest(DigestOpCode.REMOVE, path2Delete);
                addChangeRecord(nodeRecord);
            

            if (ZooKeeperServer.isCloseSessionTxnEnabled()) 
            
            	// 设置事务体
                request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
            
            
            // 设置会话状态为关闭中
            zks.sessionTracker.setSessionClosing(request.sessionId);
        

        ServerMetrics.getMetrics()
        	.CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
	如果类型为OpCode.check
		// 会话检查
		zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
        // 反向序列化得到数据实体
        if (deserialize) 
        
            ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
        
        // 获取路径
        path = checkVersionRequest.getPath();
        // 验证路径
        validatePath(path, request.sessionId);
        // 获取路径最新信息
        nodeRecord = getRecordForPath(path);
        // 权限检查
        zks.checkACL(
        	request.cnxn, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo, path, null);
        // 设置事务体
        request.setTxn(
        	new CheckVersionTxn(path, 
        		checkAndIncVersion(nodeRecord.stat.getVersion(), 
        		checkVersionRequest.getVersion(), null));

以上是关于zookeeper源码解析--请求处理--PrepRequestProcessor的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper源码分析之请求处理链

zookeeper(13)源码分析-请求处理链

zookeeper源码分析:选举流程和请求处理

zookeeper(16)源码分析-ZAB协议

ZooKeeper客户端源码——向服务端发起请求(顺序响应+同步阻塞+异步回调)

zookeeper原理解析-服务器端处理流程